You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/15 11:42:55 UTC
[kafka] branch 2.0 updated: KAFKA-7032 The TimeUnit is neglected by KakfaConsumer#close(long, Tim… (#5182)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 72e1fd5 KAFKA-7032 The TimeUnit is neglected by KakfaConsumer#close(long, Tim… (#5182)
72e1fd5 is described below
commit 72e1fd5ca90eb89bb5068db3135a4da8e498bb3b
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Fri Jun 15 19:16:04 2018 +0800
KAFKA-7032 The TimeUnit is neglected by KakfaConsumer#close(long, Tim… (#5182)
---
.../org/apache/kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../apache/kafka/clients/consumer/KafkaConsumerTest.java | 16 ++++++++++++++--
2 files changed, 15 insertions(+), 3 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 76e0fcc..342c559 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2081,7 +2081,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Deprecated
@Override
public void close(long timeout, TimeUnit timeUnit) {
- close(Duration.ofMillis(TimeUnit.MILLISECONDS.toMillis(timeout)));
+ close(Duration.ofMillis(timeUnit.toMillis(timeout)));
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 97ec082..316404b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -44,10 +44,10 @@ import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.IsolationLevel;
@@ -71,6 +71,7 @@ import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -1811,7 +1812,7 @@ public class KafkaConsumerTest {
requestTimeoutMs,
IsolationLevel.READ_UNCOMMITTED);
- return new KafkaConsumer<>(
+ return new KafkaConsumer<String, String>(
loggerFactory,
clientId,
consumerCoordinator,
@@ -1839,4 +1840,15 @@ public class KafkaConsumerTest {
this.count = count;
}
}
+
+ @Test
+ public void testCloseWithTimeUnit() {
+ KafkaConsumer consumer = EasyMock.partialMockBuilder(KafkaConsumer.class)
+ .addMockedMethod("close", Duration.class).createMock();
+ consumer.close(Duration.ofSeconds(1));
+ EasyMock.expectLastCall();
+ EasyMock.replay(consumer);
+ consumer.close(1, TimeUnit.SECONDS);
+ EasyMock.verify(consumer);
+ }
}
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.