You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/26 19:27:28 UTC
kafka git commit: KAFKA-5124: autocommit reset earliest fixes race
condition
Repository: kafka
Updated Branches:
refs/heads/trunk f7b7b4745 -> d06f2cc7a
KAFKA-5124: autocommit reset earliest fixes race condition
Fixes `org.apache.kafka.streams.integration.utils.IntegrationTestUtils#readKeyValues` potentially starting to `poll` for stream output after the stream finished sending the test data and hence missing it when working with `latest` offsets.
Author: Armin Braun <me...@obrown.io>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes #2921 from original-brownbear/KAFKA-5124
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d06f2cc7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d06f2cc7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d06f2cc7
Branch: refs/heads/trunk
Commit: d06f2cc7ac8d214edbefa57addc82312fbae884f
Parents: f7b7b47
Author: Armin Braun <me...@obrown.io>
Authored: Wed Apr 26 12:27:24 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 26 12:27:24 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/integration/KTableKTableJoinIntegrationTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d06f2cc7/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 43b9f72..15c955a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -112,6 +112,7 @@ public class KTableKTableJoinIntegrationTest {
CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
@Before
@@ -218,7 +219,6 @@ public class KTableKTableJoinIntegrationTest {
streams = prepareTopology(joinType1, joinType2);
streams.start();
-
final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,