You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/24 05:10:56 UTC

[4/4] kafka git commit: KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic

KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic

  - bug-fix follow up
  - Resetter fails if no intermediate topic is used because seekToEnd() commit ALL partitions to EOL

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Michael G. Noll, Roger Hoover, Guozhang Wang

Closes #2138 from mjsax/kafka-4331-streams-resetter-bugfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ecb51680
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ecb51680
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ecb51680

Branch: refs/heads/0.10.1
Commit: ecb51680a982120691becc37c302aa78135dcfdf
Parents: 9d3003b
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 23 17:31:34 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 20:58:18 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/tools/StreamsResetter.java |   8 +-
 .../integration/ResetIntegrationTest.java       | 195 +++++++++++++++----
 2 files changed, 164 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ecb51680/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 1bb63f7..828eeef 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -218,8 +218,12 @@ public class StreamsResetter {
                 }
             }
 
-            client.seekToBeginning(inputAndInternalTopicPartitions);
-            client.seekToEnd(intermediateTopicPartitions);
+            if (inputAndInternalTopicPartitions.size() > 0) {
+                client.seekToBeginning(inputAndInternalTopicPartitions);
+            }
+            if (intermediateTopicPartitions.size() > 0) {
+                client.seekToEnd(intermediateTopicPartitions);
+            }
 
             for (final TopicPartition p : partitions) {
                 client.position(p);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecb51680/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index d07970f..5f85536 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -21,6 +21,7 @@ import kafka.tools.StreamsResetter;
 import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -40,7 +41,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -75,10 +76,11 @@ public class ResetIntegrationTest {
 
     private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
     private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
+    private static final int TIMEOUT_MULTIPLYER = 5;
 
     private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
-
-    private AdminClient adminClient = null;
+    private static int testNo = 0;
+    private static AdminClient adminClient = null;
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
@@ -86,24 +88,48 @@ public class ResetIntegrationTest {
         CLUSTER.createTopic(OUTPUT_TOPIC);
         CLUSTER.createTopic(OUTPUT_TOPIC_2);
         CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
-        CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
-    }
-
-    @Before
-    public void prepare() {
-        adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
     }
 
-    @After
-    public void cleanup() {
+    @AfterClass
+    public static void globalCleanup() {
         if (adminClient != null) {
             adminClient.close();
             adminClient = null;
         }
     }
 
+    @Before
+    public void cleanup() throws Exception {
+        ++testNo;
+
+        if (adminClient == null) {
+            adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
+        }
+
+        // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
+        while (true) {
+            Thread.sleep(50);
+
+            try {
+                TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+                        "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+            } catch (GroupCoordinatorNotAvailableException e) {
+                continue;
+            } catch (IllegalArgumentException e) {
+                continue;
+            }
+            break;
+        }
+
+        if (testNo == 1) {
+            prepareInputData();
+        }
+    }
+
     @Test
-    public void testReprocessingFromScratchAfterReset() throws Exception {
+    public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
+        CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
+
         final Properties streamsConfiguration = prepareTest();
         final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
             CLUSTER.bootstrapServers(),
@@ -111,11 +137,8 @@ public class ResetIntegrationTest {
             LongDeserializer.class,
             LongDeserializer.class);
 
-        prepareInputData();
-        final KStreamBuilder builder = setupTopology(OUTPUT_TOPIC_2);
-
         // RUN
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration);
         streams.start();
         final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
@@ -131,17 +154,17 @@ public class ResetIntegrationTest {
         ).get(0);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, 5 * STREAMS_CONSUMER_TIMEOUT,
-            "Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
-        streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
         streams.cleanUp();
-        cleanGlobal();
-        TestUtils.waitForCondition(consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + (5 * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(INTERMEDIATE_USER_TOPIC);
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
-        assertInternalTopicsGotDeleted();
+        assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
 
         // RE-RUN
         streams.start();
@@ -159,11 +182,82 @@ public class ResetIntegrationTest {
 
         assertThat(resultRerun, equalTo(result));
         assertThat(resultRerun2, equalTo(result2));
+
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(INTERMEDIATE_USER_TOPIC);
+
+        CLUSTER.deleteTopic(INTERMEDIATE_USER_TOPIC);
+        Set<String> allTopics;
+        ZkUtils zkUtils = null;
+        try {
+            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+                    30000,
+                    30000,
+                    JaasUtils.isZkSecurityEnabled());
+
+            do {
+                Utils.sleep(100);
+                allTopics = new HashSet<>();
+                allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            } while (allTopics.contains(INTERMEDIATE_USER_TOPIC));
+        } finally {
+            if (zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+    }
+
+    @Test
+    public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
+        final Properties streamsConfiguration = prepareTest();
+        final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
+                LongDeserializer.class,
+                LongDeserializer.class);
+
+        // RUN
+        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.start();
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                resultTopicConsumerConfig,
+                OUTPUT_TOPIC,
+                10,
+                60000);
+
+        streams.close();
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT,
+                "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+
+        // RESET
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.cleanUp();
+        cleanGlobal(null);
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+
+        assertInternalTopicsGotDeleted(null);
+
+        // RE-RUN
+        streams.start();
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                resultTopicConsumerConfig,
+                OUTPUT_TOPIC,
+                10,
+                60000);
+        streams.close();
+
+        assertThat(resultRerun, equalTo(result));
+
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(null);
     }
 
     private Properties prepareTest() throws Exception {
         final Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@@ -205,7 +299,7 @@ public class ResetIntegrationTest {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
     }
 
-    private KStreamBuilder setupTopology(final String outputTopic2) {
+    private KStreamBuilder setupTopologyWithIntermediateUserTopic(final String outputTopic2) {
         final KStreamBuilder builder = new KStreamBuilder();
 
         final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
@@ -251,27 +345,54 @@ public class ResetIntegrationTest {
         return builder;
     }
 
-    private void cleanGlobal() {
-        final Properties cleanUpConfig = new Properties();
-        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
-        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+    private KStreamBuilder setupTopologyWithoutIntermediateUserTopic() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
 
-        final int exitCode = new StreamsResetter().run(
-            new String[]{
-                "--application-id", APP_ID,
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() {
+            @Override
+            public KeyValue<Long, Long> apply(final Long key, final String value) {
+                return new KeyValue<>(key, key);
+            }
+        }).to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
+
+        return builder;
+    }
+
+    private void cleanGlobal(final String intermediateUserTopic) {
+        final String[] parameters;
+        if (intermediateUserTopic != null) {
+            parameters = new String[]{
+                "--application-id", APP_ID + testNo,
                 "--bootstrap-server", CLUSTER.bootstrapServers(),
                 "--zookeeper", CLUSTER.zKConnectString(),
                 "--input-topics", INPUT_TOPIC,
                 "--intermediate-topics", INTERMEDIATE_USER_TOPIC
-            },
-            cleanUpConfig);
+            };
+        } else {
+            parameters = new String[]{
+                "--application-id", APP_ID + testNo,
+                "--bootstrap-server", CLUSTER.bootstrapServers(),
+                "--zookeeper", CLUSTER.zKConnectString(),
+                "--input-topics", INPUT_TOPIC
+            };
+        }
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
         Assert.assertEquals(0, exitCode);
     }
 
-    private void assertInternalTopicsGotDeleted() {
+    private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
         final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
         expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
-        expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC);
+        if (intermediateUserTopic != null) {
+            expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic);
+        }
         expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
         expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
         expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
@@ -301,7 +422,7 @@ public class ResetIntegrationTest {
     private class WaitUntilConsumerGroupGotClosed implements TestCondition {
         @Override
         public boolean conditionMet() {
-            return adminClient.describeGroup(APP_ID).members().isEmpty();
+            return adminClient.describeGroup(APP_ID + testNo).members().isEmpty();
         }
     }