You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/11/12 19:18:43 UTC

[kafka] branch 2.1 updated: KAFKA-7518: Fix FutureRecordMetadata.get when TimeUnit is not ms (#5815)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b1aa80c  KAFKA-7518: Fix FutureRecordMetadata.get when TimeUnit is not ms (#5815)
b1aa80c is described below

commit b1aa80c4f60d3f19b5daac5eb9789d7570bc415d
Author: Andras Katona <41...@users.noreply.github.com>
AuthorDate: Mon Nov 12 20:17:55 2018 +0100

    KAFKA-7518: Fix FutureRecordMetadata.get when TimeUnit is not ms (#5815)
    
    Also check for timeout before calling `nextRecordMetadata.get`. Added unit test
    validating the fix.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../kafka/clients/producer/MockProducer.java       |  4 +-
 .../producer/internals/FutureRecordMetadata.java   | 20 +++---
 .../producer/internals/ProduceRequestResult.java   |  2 +-
 .../clients/producer/internals/ProducerBatch.java  |  7 +-
 .../kafka/clients/producer/RecordSendTest.java     |  7 +-
 .../internals/FutureRecordMetadataTest.java        | 82 ++++++++++++++++++++++
 6 files changed, 107 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index a38bd04..e448d6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -254,7 +255,8 @@ public class MockProducer<K, V> implements Producer<K, V> {
             partition = partition(record, this.cluster);
         TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
         ProduceRequestResult result = new ProduceRequestResult(topicPartition);
-        FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
+        FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP,
+                0L, 0, 0, Time.SYSTEM);
         long offset = nextOffset(topicPartition);
         Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset,
                 RecordBatch.NO_TIMESTAMP, Long.valueOf(0L), 0, 0), result, callback);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index 8fcc46f..d1a643b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -16,13 +16,14 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Time;
+
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.kafka.clients.producer.RecordMetadata;
-
 /**
  * The future result of a record send
  */
@@ -34,16 +35,18 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
     private final Long checksum;
     private final int serializedKeySize;
     private final int serializedValueSize;
+    private final Time time;
     private volatile FutureRecordMetadata nextRecordMetadata = null;
 
     public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp,
-                                Long checksum, int serializedKeySize, int serializedValueSize) {
+                                Long checksum, int serializedKeySize, int serializedValueSize, Time time) {
         this.result = result;
         this.relativeOffset = relativeOffset;
         this.createTimestamp = createTimestamp;
         this.checksum = checksum;
         this.serializedKeySize = serializedKeySize;
         this.serializedValueSize = serializedValueSize;
+        this.time = time;
     }
 
     @Override
@@ -67,13 +70,14 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
     @Override
     public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
         // Handle overflow.
-        long now = System.currentTimeMillis();
-        long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout;
+        long now = time.milliseconds();
+        long timeoutMillis = unit.toMillis(timeout);
+        long deadline = Long.MAX_VALUE - timeoutMillis < now ? Long.MAX_VALUE : now + timeoutMillis;
         boolean occurred = this.result.await(timeout, unit);
-        if (nextRecordMetadata != null)
-            return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
         if (!occurred)
-            throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
+            throw new TimeoutException("Timeout after waiting for " + timeoutMillis + " ms.");
+        if (nextRecordMetadata != null)
+            return nextRecordMetadata.get(deadline - time.milliseconds(), TimeUnit.MILLISECONDS);
         return valueOrError();
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
index fbfef61..1e8c787 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
  * partition in a produce request and it is shared by all the {@link RecordMetadata} instances that are batched together
  * for the same partition in the request.
  */
-public final class ProduceRequestResult {
+public class ProduceRequestResult {
 
     private final CountDownLatch latch = new CountDownLatch(1);
     private final TopicPartition topicPartition;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 0adbbf9..80372cb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,7 +110,8 @@ public final class ProducerBatch {
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                    timestamp, checksum,
                                                                    key == null ? -1 : key.length,
-                                                                   value == null ? -1 : value.length);
+                                                                   value == null ? -1 : value.length,
+                                                                   Time.SYSTEM);
             // we have to keep every future returned to the users in case the batch needs to be
             // split to several new batches and resent.
             thunks.add(new Thunk(callback, future));
@@ -133,7 +135,8 @@ public final class ProducerBatch {
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                    timestamp, thunk.future.checksumOrNull(),
                                                                    key == null ? -1 : key.remaining(),
-                                                                   value == null ? -1 : value.remaining());
+                                                                   value == null ? -1 : value.remaining(),
+                                                                   Time.SYSTEM);
             // Chain the future to the original thunk.
             thunk.future.chain(future);
             this.thunks.add(thunk);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index c083db3..45be117 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 
 public class RecordSendTest {
@@ -46,7 +47,7 @@ public class RecordSendTest {
     public void testTimeout() throws Exception {
         ProduceRequestResult request = new ProduceRequestResult(topicPartition);
         FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset,
-                RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
+                RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
         assertFalse("Request is not completed", future.isDone());
         try {
             future.get(5, TimeUnit.MILLISECONDS);
@@ -66,7 +67,7 @@ public class RecordSendTest {
     @Test(expected = ExecutionException.class)
     public void testError() throws Exception {
         FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L),
-                relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
+                relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
         future.get();
     }
 
@@ -76,7 +77,7 @@ public class RecordSendTest {
     @Test
     public void testBlocking() throws Exception {
         FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L),
-                relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
+                relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
         assertEquals(baseOffset + relOffset, future.get().offset());
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java
new file mode 100644
index 0000000..aee24e8
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class FutureRecordMetadataTest {
+
+    private final MockTime time = new MockTime();
+
+    @Test
+    public void testFutureGetWithSeconds() throws ExecutionException, InterruptedException, TimeoutException {
+        ProduceRequestResult produceRequestResult = mockProduceRequestResult();
+        FutureRecordMetadata future = futureRecordMetadata(produceRequestResult);
+
+        ProduceRequestResult chainedProduceRequestResult = mockProduceRequestResult();
+        future.chain(futureRecordMetadata(chainedProduceRequestResult));
+
+        future.get(1L, TimeUnit.SECONDS);
+
+        verify(produceRequestResult).await(1L, TimeUnit.SECONDS);
+        verify(chainedProduceRequestResult).await(1000L, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testFutureGetWithMilliSeconds() throws ExecutionException, InterruptedException, TimeoutException {
+        ProduceRequestResult produceRequestResult = mockProduceRequestResult();
+        FutureRecordMetadata future = futureRecordMetadata(produceRequestResult);
+
+        ProduceRequestResult chainedProduceRequestResult = mockProduceRequestResult();
+        future.chain(futureRecordMetadata(chainedProduceRequestResult));
+
+        future.get(1000L, TimeUnit.MILLISECONDS);
+
+        verify(produceRequestResult).await(1000L, TimeUnit.MILLISECONDS);
+        verify(chainedProduceRequestResult).await(1000L, TimeUnit.MILLISECONDS);
+    }
+
+    private FutureRecordMetadata futureRecordMetadata(ProduceRequestResult produceRequestResult) {
+        return new FutureRecordMetadata(
+                produceRequestResult,
+                0,
+                RecordBatch.NO_TIMESTAMP,
+                0L,
+                0,
+                0,
+                time
+        );
+    }
+
+    private ProduceRequestResult mockProduceRequestResult() throws InterruptedException {
+        ProduceRequestResult mockProduceRequestResult = mock(ProduceRequestResult.class);
+        when(mockProduceRequestResult.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
+        return mockProduceRequestResult;
+    }
+}