You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/24 05:10:56 UTC
[4/4] kafka git commit: KAFKA-4331: Kafka Streams resetter is slow
because it joins the same group for each topic
KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic
- bug-fix follow up
- Resetter fails if no intermediate topic is used because seekToEnd() commit ALL partitions to EOL
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Michael G. Noll, Roger Hoover, Guozhang Wang
Closes #2138 from mjsax/kafka-4331-streams-resetter-bugfix
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ecb51680
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ecb51680
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ecb51680
Branch: refs/heads/0.10.1
Commit: ecb51680a982120691becc37c302aa78135dcfdf
Parents: 9d3003b
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 23 17:31:34 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 20:58:18 2016 -0800
----------------------------------------------------------------------
.../main/scala/kafka/tools/StreamsResetter.java | 8 +-
.../integration/ResetIntegrationTest.java | 195 +++++++++++++++----
2 files changed, 164 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ecb51680/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 1bb63f7..828eeef 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -218,8 +218,12 @@ public class StreamsResetter {
}
}
- client.seekToBeginning(inputAndInternalTopicPartitions);
- client.seekToEnd(intermediateTopicPartitions);
+ if (inputAndInternalTopicPartitions.size() > 0) {
+ client.seekToBeginning(inputAndInternalTopicPartitions);
+ }
+ if (intermediateTopicPartitions.size() > 0) {
+ client.seekToEnd(intermediateTopicPartitions);
+ }
for (final TopicPartition p : partitions) {
client.position(p);
http://git-wip-us.apache.org/repos/asf/kafka/blob/ecb51680/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index d07970f..5f85536 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -21,6 +21,7 @@ import kafka.tools.StreamsResetter;
import kafka.utils.MockTime;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -40,7 +41,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -75,10 +76,11 @@ public class ResetIntegrationTest {
private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
+ private static final int TIMEOUT_MULTIPLYER = 5;
private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
-
- private AdminClient adminClient = null;
+ private static int testNo = 0;
+ private static AdminClient adminClient = null;
@BeforeClass
public static void startKafkaCluster() throws Exception {
@@ -86,24 +88,48 @@ public class ResetIntegrationTest {
CLUSTER.createTopic(OUTPUT_TOPIC);
CLUSTER.createTopic(OUTPUT_TOPIC_2);
CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
- CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
- }
-
- @Before
- public void prepare() {
- adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
}
- @After
- public void cleanup() {
+ @AfterClass
+ public static void globalCleanup() {
if (adminClient != null) {
adminClient.close();
adminClient = null;
}
}
+ @Before
+ public void cleanup() throws Exception {
+ ++testNo;
+
+ if (adminClient == null) {
+ adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
+ }
+
+ // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
+ while (true) {
+ Thread.sleep(50);
+
+ try {
+ TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+ "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ } catch (GroupCoordinatorNotAvailableException e) {
+ continue;
+ } catch (IllegalArgumentException e) {
+ continue;
+ }
+ break;
+ }
+
+ if (testNo == 1) {
+ prepareInputData();
+ }
+ }
+
@Test
- public void testReprocessingFromScratchAfterReset() throws Exception {
+ public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
+ CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
+
final Properties streamsConfiguration = prepareTest();
final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
@@ -111,11 +137,8 @@ public class ResetIntegrationTest {
LongDeserializer.class,
LongDeserializer.class);
- prepareInputData();
- final KStreamBuilder builder = setupTopology(OUTPUT_TOPIC_2);
-
// RUN
- KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration);
streams.start();
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
@@ -131,17 +154,17 @@ public class ResetIntegrationTest {
).get(0);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactive, 5 * STREAMS_CONSUMER_TIMEOUT,
- "Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT,
+ "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
- streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
+ streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
streams.cleanUp();
- cleanGlobal();
- TestUtils.waitForCondition(consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (5 * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ cleanGlobal(INTERMEDIATE_USER_TOPIC);
+ TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
- assertInternalTopicsGotDeleted();
+ assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
// RE-RUN
streams.start();
@@ -159,11 +182,82 @@ public class ResetIntegrationTest {
assertThat(resultRerun, equalTo(result));
assertThat(resultRerun2, equalTo(result2));
+
+ TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ cleanGlobal(INTERMEDIATE_USER_TOPIC);
+
+ CLUSTER.deleteTopic(INTERMEDIATE_USER_TOPIC);
+ Set<String> allTopics;
+ ZkUtils zkUtils = null;
+ try {
+ zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled());
+
+ do {
+ Utils.sleep(100);
+ allTopics = new HashSet<>();
+ allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+ } while (allTopics.contains(INTERMEDIATE_USER_TOPIC));
+ } finally {
+ if (zkUtils != null) {
+ zkUtils.close();
+ }
+ }
+ }
+
+ @Test
+ public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
+ final Properties streamsConfiguration = prepareTest();
+ final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
+ LongDeserializer.class,
+ LongDeserializer.class);
+
+ // RUN
+ KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams.start();
+ final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ resultTopicConsumerConfig,
+ OUTPUT_TOPIC,
+ 10,
+ 60000);
+
+ streams.close();
+ TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT,
+ "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+
+ // RESET
+ streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams.cleanUp();
+ cleanGlobal(null);
+ TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+
+ assertInternalTopicsGotDeleted(null);
+
+ // RE-RUN
+ streams.start();
+ final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ resultTopicConsumerConfig,
+ OUTPUT_TOPIC,
+ 10,
+ 60000);
+ streams.close();
+
+ assertThat(resultRerun, equalTo(result));
+
+ TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ cleanGlobal(null);
}
private Properties prepareTest() throws Exception {
final Properties streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@@ -205,7 +299,7 @@ public class ResetIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
}
- private KStreamBuilder setupTopology(final String outputTopic2) {
+ private KStreamBuilder setupTopologyWithIntermediateUserTopic(final String outputTopic2) {
final KStreamBuilder builder = new KStreamBuilder();
final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
@@ -251,27 +345,54 @@ public class ResetIntegrationTest {
return builder;
}
- private void cleanGlobal() {
- final Properties cleanUpConfig = new Properties();
- cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
- cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+ private KStreamBuilder setupTopologyWithoutIntermediateUserTopic() {
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
- final int exitCode = new StreamsResetter().run(
- new String[]{
- "--application-id", APP_ID,
+ // use map to trigger internal re-partitioning before groupByKey
+ input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() {
+ @Override
+ public KeyValue<Long, Long> apply(final Long key, final String value) {
+ return new KeyValue<>(key, key);
+ }
+ }).to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
+
+ return builder;
+ }
+
+ private void cleanGlobal(final String intermediateUserTopic) {
+ final String[] parameters;
+ if (intermediateUserTopic != null) {
+ parameters = new String[]{
+ "--application-id", APP_ID + testNo,
"--bootstrap-server", CLUSTER.bootstrapServers(),
"--zookeeper", CLUSTER.zKConnectString(),
"--input-topics", INPUT_TOPIC,
"--intermediate-topics", INTERMEDIATE_USER_TOPIC
- },
- cleanUpConfig);
+ };
+ } else {
+ parameters = new String[]{
+ "--application-id", APP_ID + testNo,
+ "--bootstrap-server", CLUSTER.bootstrapServers(),
+ "--zookeeper", CLUSTER.zKConnectString(),
+ "--input-topics", INPUT_TOPIC
+ };
+ }
+ final Properties cleanUpConfig = new Properties();
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+ final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
Assert.assertEquals(0, exitCode);
}
- private void assertInternalTopicsGotDeleted() {
+ private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
- expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC);
+ if (intermediateUserTopic != null) {
+ expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic);
+ }
expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
@@ -301,7 +422,7 @@ public class ResetIntegrationTest {
private class WaitUntilConsumerGroupGotClosed implements TestCondition {
@Override
public boolean conditionMet() {
- return adminClient.describeGroup(APP_ID).members().isEmpty();
+ return adminClient.describeGroup(APP_ID + testNo).members().isEmpty();
}
}