You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/03 19:08:54 UTC

[kafka] branch trunk updated: KAFKA-6256: fix flaky test KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegionWithNonZeroByteCache

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6869f95  KAFKA-6256: fix flaky test KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegionWithNonZeroByteCache
6869f95 is described below

commit 6869f952326f72f108250dbbdd8253cf95410633
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Wed Jan 3 11:08:43 2018 -0800

    KAFKA-6256: fix flaky test KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegionWithNonZeroByteCache
    
    Increase commit interval to make it less likely that we flush the cache in-between.
    To make it fool-proof, only compare the "final" result records if cache is enabled.
    
    Author: Matthias J. Sax <ma...@confluent.io>
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
    
    Closes #4364 from mjsax/kafka-6256-flaky-kstream-ktable-join-with-caching-test
---
 .../KStreamKTableJoinIntegrationTest.java          | 61 +++++++++++++++++-----
 1 file changed, 48 insertions(+), 13 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 8d4299b..f90dc4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.integration;
 
-
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -33,13 +32,16 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -49,8 +51,11 @@ import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -62,7 +67,7 @@ import static org.junit.Assert.assertThat;
 @Category({IntegrationTest.class})
 public class KStreamKTableJoinIntegrationTest {
     private static final int NUM_BROKERS = 1;
-    private static final long COMMIT_INTERVAL_MS = 300L;
+    private static final long COMMIT_INTERVAL_MS = 1000L;
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -90,11 +95,8 @@ public class KStreamKTableJoinIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
-            TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-
-
     }
 
     @After
@@ -185,6 +187,11 @@ public class KStreamKTableJoinIntegrationTest {
                 new KeyValue<>("europe", 109L),
                 new KeyValue<>("asia", 124L)
             );
+        final Map<String, Long> remainingExpectedResult = new HashMap<>();
+        for (final KeyValue<String, Long> record : expectedClicksPerRegion) {
+            remainingExpectedResult.put(record.key, record.value);
+        }
+        final AtomicInteger outputCounter = new AtomicInteger();
 
         //
         // Step 1: Configure and start the processor topology.
@@ -251,10 +258,23 @@ public class KStreamKTableJoinIntegrationTest {
                 public Long apply(final Long value1, final Long value2) {
                     return value1 + value2;
                 }
-            }, "ClicksPerRegionUnwindowed");
+            });
 
         // Write the (continuously updating) results to the output topic.
-        clicksPerRegion.to(stringSerde, longSerde, outputTopic);
+        clicksPerRegion
+            .toStream()
+            .peek(new ForeachAction<String, Long>() {
+                @Override
+                public void apply(final String key, final Long value) {
+                    outputCounter.incrementAndGet();
+
+                    final long finalValue = remainingExpectedResult.get(key);
+                    if (value == finalValue) {
+                        remainingExpectedResult.remove(key);
+                    }
+                }
+            })
+            .to(outputTopic, Produced.with(stringSerde, longSerde));
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
@@ -273,7 +293,6 @@ public class KStreamKTableJoinIntegrationTest {
         userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, userRegions, userRegionsProducerConfig, mockTime);
 
-
         //
         // Step 3: Publish some user click events.
         //
@@ -295,10 +314,26 @@ public class KStreamKTableJoinIntegrationTest {
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
 
-        final List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
-            outputTopic, expectedClicksPerRegion.size());
-
-        assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return remainingExpectedResult.isEmpty();
+            }
+        }, "Never received expected result.");
+
+        final int expectedResultSize = expectedClicksPerRegion.size();
+        final int expectedNumberOfOutputs = (cacheSizeBytes == 0)
+            ? expectedResultSize
+            : Math.max(expectedResultSize, outputCounter.get());
+        final List<KeyValue<String, Long>> actualClicksPerRegion =
+            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                consumerConfig,
+                outputTopic,
+                expectedNumberOfOutputs);
+
+        assertThat(
+            actualClicksPerRegion.subList(expectedNumberOfOutputs - expectedResultSize, expectedNumberOfOutputs),
+            equalTo(expectedClicksPerRegion));
     }
 
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].