You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/26 19:27:28 UTC

kafka git commit: KAFKA-5124: autocommit reset earliest fixes race condition

Repository: kafka
Updated Branches:
  refs/heads/trunk f7b7b4745 -> d06f2cc7a


KAFKA-5124: autocommit reset earliest fixes race condition

Fixes `org.apache.kafka.streams.integration.utils.IntegrationTestUtils#readKeyValues` potentially starting to `poll` for stream output after the stream finished sending the test data and hence missing it when working with `latest` offsets.

Author: Armin Braun <me...@obrown.io>

Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2921 from original-brownbear/KAFKA-5124


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d06f2cc7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d06f2cc7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d06f2cc7

Branch: refs/heads/trunk
Commit: d06f2cc7ac8d214edbefa57addc82312fbae884f
Parents: f7b7b47
Author: Armin Braun <me...@obrown.io>
Authored: Wed Apr 26 12:27:24 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 26 12:27:24 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/integration/KTableKTableJoinIntegrationTest.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d06f2cc7/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 43b9f72..15c955a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -112,6 +112,7 @@ public class KTableKTableJoinIntegrationTest {
         CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
         CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     }
 
     @Before
@@ -218,7 +219,6 @@ public class KTableKTableJoinIntegrationTest {
         streams = prepareTopology(joinType1, joinType2);
         streams.start();
 
-
         final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
                 CONSUMER_CONFIG,
                 OUTPUT,