You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/03 19:09:05 UTC
[kafka] branch 1.0 updated: KAFKA-6256: fix flaky test
KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegionWithNonZeroByteCache
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new ad56e9a KAFKA-6256: fix flaky test KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegionWithNonZeroByteCache
ad56e9a is described below
commit ad56e9aa06f5f752b16de1e7ae768bb8d00f8e89
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Wed Jan 3 11:08:43 2018 -0800
KAFKA-6256: fix flaky test KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegionWithNonZeroByteCache
Increase commit interval to make it less likely that we flush the cache in-between.
To make it fool-proof, only compare the "final" result records if cache is enabled.
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #4364 from mjsax/kafka-6256-flaky-kstream-ktable-join-with-caching-test
(cherry picked from commit 6869f952326f72f108250dbbdd8253cf95410633)
Signed-off-by: Guozhang Wang <wa...@gmail.com>
---
.../KStreamKTableJoinIntegrationTest.java | 61 +++++++++++++++++-----
1 file changed, 48 insertions(+), 13 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 8d4299b..f90dc4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
-
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -33,13 +32,16 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -49,8 +51,11 @@ import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -62,7 +67,7 @@ import static org.junit.Assert.assertThat;
@Category({IntegrationTest.class})
public class KStreamKTableJoinIntegrationTest {
private static final int NUM_BROKERS = 1;
- private static final long COMMIT_INTERVAL_MS = 300L;
+ private static final long COMMIT_INTERVAL_MS = 1000L;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -90,11 +95,8 @@ public class KStreamKTableJoinIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
- TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-
-
}
@After
@@ -185,6 +187,11 @@ public class KStreamKTableJoinIntegrationTest {
new KeyValue<>("europe", 109L),
new KeyValue<>("asia", 124L)
);
+ final Map<String, Long> remainingExpectedResult = new HashMap<>();
+ for (final KeyValue<String, Long> record : expectedClicksPerRegion) {
+ remainingExpectedResult.put(record.key, record.value);
+ }
+ final AtomicInteger outputCounter = new AtomicInteger();
//
// Step 1: Configure and start the processor topology.
@@ -251,10 +258,23 @@ public class KStreamKTableJoinIntegrationTest {
public Long apply(final Long value1, final Long value2) {
return value1 + value2;
}
- }, "ClicksPerRegionUnwindowed");
+ });
// Write the (continuously updating) results to the output topic.
- clicksPerRegion.to(stringSerde, longSerde, outputTopic);
+ clicksPerRegion
+ .toStream()
+ .peek(new ForeachAction<String, Long>() {
+ @Override
+ public void apply(final String key, final Long value) {
+ outputCounter.incrementAndGet();
+
+ final long finalValue = remainingExpectedResult.get(key);
+ if (value == finalValue) {
+ remainingExpectedResult.remove(key);
+ }
+ }
+ })
+ .to(outputTopic, Produced.with(stringSerde, longSerde));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
@@ -273,7 +293,6 @@ public class KStreamKTableJoinIntegrationTest {
userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, userRegions, userRegionsProducerConfig, mockTime);
-
//
// Step 3: Publish some user click events.
//
@@ -295,10 +314,26 @@ public class KStreamKTableJoinIntegrationTest {
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
- final List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
- outputTopic, expectedClicksPerRegion.size());
-
- assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return remainingExpectedResult.isEmpty();
+ }
+ }, "Never received expected result.");
+
+ final int expectedResultSize = expectedClicksPerRegion.size();
+ final int expectedNumberOfOutputs = (cacheSizeBytes == 0)
+ ? expectedResultSize
+ : Math.max(expectedResultSize, outputCounter.get());
+ final List<KeyValue<String, Long>> actualClicksPerRegion =
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ consumerConfig,
+ outputTopic,
+ expectedNumberOfOutputs);
+
+ assertThat(
+ actualClicksPerRegion.subList(expectedNumberOfOutputs - expectedResultSize, expectedNumberOfOutputs),
+ equalTo(expectedClicksPerRegion));
}
}
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].