You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/06/17 15:02:01 UTC

[kafka] branch trunk updated: MINOR: Fix flaky HighAvailabilityTaskAssignorIntegrationTest (#8884)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0c2dfcf  MINOR: Fix flaky HighAvailabilityTaskAssignorIntegrationTest (#8884)
0c2dfcf is described below

commit 0c2dfcfc85ccf2ab06a69910f5a991720880d7ba
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Jun 17 10:01:30 2020 -0500

    MINOR: Fix flaky HighAvailabilityTaskAssignorIntegrationTest (#8884)
    
    Reduce test data set from 1000 records to 500.
    Some recent test failures indicate that the Jenkins runners aren't
    able to process all 1000 records in two minutes.
    
    Also add sanity check that all the test data are readable from the
    input topic.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>
---
 ...ighAvailabilityTaskAssignorIntegrationTest.java | 91 ++++++++++++++--------
 1 file changed, 57 insertions(+), 34 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index 725c805..7e7cf29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -95,6 +95,11 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
         final String testId = safeUniqueTestName(getClass(), testName);
         final String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
         final String inputTopic = "input" + testId;
+        final Set<TopicPartition> inputTopicPartitions = mkSet(
+            new TopicPartition(inputTopic, 0),
+            new TopicPartition(inputTopic, 1)
+        );
+
         final String storeName = "store" + testId;
         final String storeChangelog = appId + "-store" + testId + "-changelog";
         final Set<TopicPartition> changelogTopicPartitions = mkSet(
@@ -124,46 +129,27 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
         builder.table(inputTopic, materializedFunction.apply(storeName));
         final Topology topology = builder.build();
 
-        final Properties producerProperties = mkProperties(
-            mkMap(
-                mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
-                mkEntry(ProducerConfig.ACKS_CONFIG, "all"),
-                mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()),
-                mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
-            )
-        );
-
-        final StringBuilder kiloBuilder = new StringBuilder(1000);
-        for (int i = 0; i < 1000; i++) {
-            kiloBuilder.append('0');
-        }
-        final String kilo = kiloBuilder.toString();
-
-        try (final Producer<String, String> producer = new KafkaProducer<>(producerProperties)) {
-            for (int i = 0; i < 1000; i++) {
-                producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo));
-            }
-        }
-
-        final Properties consumerProperties = mkProperties(
-            mkMap(
-                mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
-                mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()),
-                mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
-            )
-        );
+        final int numberOfRecords = 500;
 
+        produceTestData(inputTopic, numberOfRecords);
 
         try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
              final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
-             final Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
+             final Consumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties())) {
             kafkaStreams0.start();
 
+            // sanity check: just make sure we actually wrote all the input records
+            TestUtils.waitForCondition(
+                () -> getEndOffsetSum(inputTopicPartitions, consumer) == numberOfRecords,
+                120_000L,
+                () -> "Input records haven't all been written to the input topic: " + getEndOffsetSum(inputTopicPartitions, consumer)
+            );
+
             // wait until all the input records are in the changelog
             TestUtils.waitForCondition(
-                () -> getChangelogOffsetSum(changelogTopicPartitions, consumer) == 1000,
+                () -> getEndOffsetSum(changelogTopicPartitions, consumer) == numberOfRecords,
                 120_000L,
-                () -> "Input records haven't all been written to the changelog: " + getChangelogOffsetSum(changelogTopicPartitions, consumer)
+                () -> "Input records haven't all been written to the changelog: " + getEndOffsetSum(changelogTopicPartitions, consumer)
             );
 
             final AtomicLong instance1TotalRestored = new AtomicLong(-1);
@@ -240,7 +226,44 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
         }
     }
 
-    private void assertFalseNoRetry(final boolean assertion, final String message) {
+    private void produceTestData(final String inputTopic, final int numberOfRecords) {
+        final String kilo = getKiloByteValue();
+
+        final Properties producerProperties = mkProperties(
+            mkMap(
+                mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(ProducerConfig.ACKS_CONFIG, "all"),
+                mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()),
+                mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
+            )
+        );
+
+        try (final Producer<String, String> producer = new KafkaProducer<>(producerProperties)) {
+            for (int i = 0; i < numberOfRecords; i++) {
+                producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo));
+            }
+        }
+    }
+
+    private static Properties getConsumerProperties() {
+        return mkProperties(
+                mkMap(
+                    mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                    mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()),
+                    mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
+                )
+            );
+    }
+
+    private static String getKiloByteValue() {
+        final StringBuilder kiloBuilder = new StringBuilder(1000);
+        for (int i = 0; i < 1000; i++) {
+            kiloBuilder.append('0');
+        }
+        return kiloBuilder.toString();
+    }
+
+    private static void assertFalseNoRetry(final boolean assertion, final String message) {
         if (assertion) {
             throw new NoRetryException(
                 new AssertionError(
@@ -268,8 +291,8 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
         );
     }
 
-    private static long getChangelogOffsetSum(final Set<TopicPartition> changelogTopicPartitions,
-                                              final Consumer<String, String> consumer) {
+    private static long getEndOffsetSum(final Set<TopicPartition> changelogTopicPartitions,
+                                        final Consumer<String, String> consumer) {
         long sum = 0;
         final Collection<Long> values = consumer.endOffsets(changelogTopicPartitions).values();
         for (final Long value : values) {