You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/11/12 19:18:43 UTC
[kafka] branch 2.1 updated: KAFKA-7518: Fix
FutureRecordMetadata.get when TimeUnit is not ms (#5815)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new b1aa80c KAFKA-7518: Fix FutureRecordMetadata.get when TimeUnit is not ms (#5815)
b1aa80c is described below
commit b1aa80c4f60d3f19b5daac5eb9789d7570bc415d
Author: Andras Katona <41...@users.noreply.github.com>
AuthorDate: Mon Nov 12 20:17:55 2018 +0100
KAFKA-7518: Fix FutureRecordMetadata.get when TimeUnit is not ms (#5815)
Also check for timeout before calling `nextRecordMetadata.get`. Added unit test
validating the fix.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../kafka/clients/producer/MockProducer.java | 4 +-
.../producer/internals/FutureRecordMetadata.java | 20 +++---
.../producer/internals/ProduceRequestResult.java | 2 +-
.../clients/producer/internals/ProducerBatch.java | 7 +-
.../kafka/clients/producer/RecordSendTest.java | 7 +-
.../internals/FutureRecordMetadataTest.java | 82 ++++++++++++++++++++++
6 files changed, 107 insertions(+), 15 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index a38bd04..e448d6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -254,7 +255,8 @@ public class MockProducer<K, V> implements Producer<K, V> {
partition = partition(record, this.cluster);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
ProduceRequestResult result = new ProduceRequestResult(topicPartition);
- FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
+ FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP,
+ 0L, 0, 0, Time.SYSTEM);
long offset = nextOffset(topicPartition);
Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset,
RecordBatch.NO_TIMESTAMP, Long.valueOf(0L), 0, 0), result, callback);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index 8fcc46f..d1a643b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -16,13 +16,14 @@
*/
package org.apache.kafka.clients.producer.internals;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Time;
+
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.kafka.clients.producer.RecordMetadata;
-
/**
* The future result of a record send
*/
@@ -34,16 +35,18 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
private final Long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
+ private final Time time;
private volatile FutureRecordMetadata nextRecordMetadata = null;
public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp,
- Long checksum, int serializedKeySize, int serializedValueSize) {
+ Long checksum, int serializedKeySize, int serializedValueSize, Time time) {
this.result = result;
this.relativeOffset = relativeOffset;
this.createTimestamp = createTimestamp;
this.checksum = checksum;
this.serializedKeySize = serializedKeySize;
this.serializedValueSize = serializedValueSize;
+ this.time = time;
}
@Override
@@ -67,13 +70,14 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
@Override
public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// Handle overflow.
- long now = System.currentTimeMillis();
- long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout;
+ long now = time.milliseconds();
+ long timeoutMillis = unit.toMillis(timeout);
+ long deadline = Long.MAX_VALUE - timeoutMillis < now ? Long.MAX_VALUE : now + timeoutMillis;
boolean occurred = this.result.await(timeout, unit);
- if (nextRecordMetadata != null)
- return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (!occurred)
- throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
+ throw new TimeoutException("Timeout after waiting for " + timeoutMillis + " ms.");
+ if (nextRecordMetadata != null)
+ return nextRecordMetadata.get(deadline - time.milliseconds(), TimeUnit.MILLISECONDS);
return valueOrError();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
index fbfef61..1e8c787 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
* partition in a produce request and it is shared by all the {@link RecordMetadata} instances that are batched together
* for the same partition in the request.
*/
-public final class ProduceRequestResult {
+public class ProduceRequestResult {
private final CountDownLatch latch = new CountDownLatch(1);
private final TopicPartition topicPartition;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 0adbbf9..80372cb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,7 +110,8 @@ public final class ProducerBatch {
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
- value == null ? -1 : value.length);
+ value == null ? -1 : value.length,
+ Time.SYSTEM);
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent.
thunks.add(new Thunk(callback, future));
@@ -133,7 +135,8 @@ public final class ProducerBatch {
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, thunk.future.checksumOrNull(),
key == null ? -1 : key.remaining(),
- value == null ? -1 : value.remaining());
+ value == null ? -1 : value.remaining(),
+ Time.SYSTEM);
// Chain the future to the original thunk.
thunk.future.chain(future);
this.thunks.add(thunk);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index c083db3..45be117 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
import org.junit.Test;
public class RecordSendTest {
@@ -46,7 +47,7 @@ public class RecordSendTest {
public void testTimeout() throws Exception {
ProduceRequestResult request = new ProduceRequestResult(topicPartition);
FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset,
- RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
+ RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
assertFalse("Request is not completed", future.isDone());
try {
future.get(5, TimeUnit.MILLISECONDS);
@@ -66,7 +67,7 @@ public class RecordSendTest {
@Test(expected = ExecutionException.class)
public void testError() throws Exception {
FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L),
- relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
+ relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
future.get();
}
@@ -76,7 +77,7 @@ public class RecordSendTest {
@Test
public void testBlocking() throws Exception {
FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L),
- relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
+ relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
assertEquals(baseOffset + relOffset, future.get().offset());
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java
new file mode 100644
index 0000000..aee24e8
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class FutureRecordMetadataTest {
+
+ private final MockTime time = new MockTime();
+
+ @Test
+ public void testFutureGetWithSeconds() throws ExecutionException, InterruptedException, TimeoutException {
+ ProduceRequestResult produceRequestResult = mockProduceRequestResult();
+ FutureRecordMetadata future = futureRecordMetadata(produceRequestResult);
+
+ ProduceRequestResult chainedProduceRequestResult = mockProduceRequestResult();
+ future.chain(futureRecordMetadata(chainedProduceRequestResult));
+
+ future.get(1L, TimeUnit.SECONDS);
+
+ verify(produceRequestResult).await(1L, TimeUnit.SECONDS);
+ verify(chainedProduceRequestResult).await(1000L, TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testFutureGetWithMilliSeconds() throws ExecutionException, InterruptedException, TimeoutException {
+ ProduceRequestResult produceRequestResult = mockProduceRequestResult();
+ FutureRecordMetadata future = futureRecordMetadata(produceRequestResult);
+
+ ProduceRequestResult chainedProduceRequestResult = mockProduceRequestResult();
+ future.chain(futureRecordMetadata(chainedProduceRequestResult));
+
+ future.get(1000L, TimeUnit.MILLISECONDS);
+
+ verify(produceRequestResult).await(1000L, TimeUnit.MILLISECONDS);
+ verify(chainedProduceRequestResult).await(1000L, TimeUnit.MILLISECONDS);
+ }
+
+ private FutureRecordMetadata futureRecordMetadata(ProduceRequestResult produceRequestResult) {
+ return new FutureRecordMetadata(
+ produceRequestResult,
+ 0,
+ RecordBatch.NO_TIMESTAMP,
+ 0L,
+ 0,
+ 0,
+ time
+ );
+ }
+
+ private ProduceRequestResult mockProduceRequestResult() throws InterruptedException {
+ ProduceRequestResult mockProduceRequestResult = mock(ProduceRequestResult.class);
+ when(mockProduceRequestResult.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
+ return mockProduceRequestResult;
+ }
+}