You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/11 05:48:48 UTC

kafka git commit: KAFKA-4476: Kafka Streams gets stuck if metadata is missing

Repository: kafka
Updated Branches:
  refs/heads/trunk 8e9e17767 -> 1d586cb50


KAFKA-4476: Kafka Streams gets stuck if metadata is missing

 - break loop in StreamPartitionAssigner.assign() in case partition metadata is missing
 - fit state transition issue (follow up to KAFKA-3637: Add method that checks if streams are initialised)
 - some test improvements

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Eno Thereska, Ismael Juma, Guozhang Wang

Closes #2209 from mjsax/kafka-4476-stuck-on-missing-metadata


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d586cb5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d586cb5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d586cb5

Branch: refs/heads/trunk
Commit: 1d586cb50a94540f6b931a8d525ba75273f314f0
Parents: 8e9e177
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sat Dec 10 21:48:44 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Dec 10 21:48:44 2016 -0800

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 28 +++++---
 .../integration/ResetIntegrationTest.java       | 54 ++++++---------
 .../internals/StreamPartitionAssignorTest.java  | 73 ++++++++++++++++++++
 3 files changed, 114 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d586cb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 84f78dc..7e15f70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -59,6 +59,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
 
+    public final static int UNKNOWN = -1;
+    public final static int NOT_AVAILABLE = -2;
+
     private static class AssignedPartition implements Comparable<AssignedPartition> {
         public final TaskId taskId;
         public final TopicPartition partition;
@@ -128,7 +131,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         InternalTopicMetadata(final InternalTopicConfig config) {
             this.config = config;
-            this.numPartitions = -1;
+            this.numPartitions = UNKNOWN;
         }
     }
 
@@ -140,7 +143,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             if (result != 0) {
                 return result;
             } else {
-                return p1.partition() < p2.partition() ? -1 : (p1.partition() > p2.partition() ? 1 : 0);
+                return p1.partition() < p2.partition() ? UNKNOWN : (p1.partition() > p2.partition() ? 1 : 0);
             }
         }
     };
@@ -311,7 +314,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
 
                     // try set the number of partitions for this repartition topic if it is not set yet
-                    if (numPartitions == -1) {
+                    if (numPartitions == UNKNOWN) {
                         for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
                             Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
 
@@ -326,6 +329,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                                         numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions;
                                     } else {
                                         numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
+                                        if (numPartitionsCandidate == null) {
+                                            repartitionTopicMetadata.get(topicName).numPartitions = NOT_AVAILABLE;
+                                        }
                                     }
 
                                     if (numPartitionsCandidate != null && numPartitionsCandidate > numPartitions) {
@@ -337,7 +343,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
                         // if we still have not find the right number of partitions,
                         // another iteration is needed
-                        if (numPartitions == -1)
+                        if (numPartitions == UNKNOWN)
                             numPartitionsNeeded = true;
                         else
                             repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
@@ -429,7 +435,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
             for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
                 // the expected number of partitions is the max value of TaskId.partition + 1
-                int numPartitions = -1;
+                int numPartitions = UNKNOWN;
                 if (tasksByTopicGroup.get(topicGroupId) != null) {
                     for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
                         if (numPartitions < task.partition + 1)
@@ -607,8 +613,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 InternalTopicConfig topic = entry.getValue().config;
                 Integer numPartitions = entry.getValue().numPartitions;
 
-                if (numPartitions < 0)
+                if (numPartitions == NOT_AVAILABLE) {
+                    continue;
+                }
+                if (numPartitions < 0) {
                     throw new TopologyBuilderException(String.format("stream-thread [%s] Topic [%s] number of partitions not defined", streamThread.getName(), topic.name()));
+                }
 
                 internalTopicManager.makeReady(topic, numPartitions);
 
@@ -647,7 +657,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     private void ensureCopartitioning(Set<String> copartitionGroup,
                                       Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
                                       Cluster metadata) {
-        int numPartitions = -1;
+        int numPartitions = UNKNOWN;
 
         for (String topic : copartitionGroup) {
             if (!allRepartitionTopicsNumPartitions.containsKey(topic)) {
@@ -656,7 +666,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 if (partitions == null)
                     throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", streamThread.getName(), topic));
 
-                if (numPartitions == -1) {
+                if (numPartitions == UNKNOWN) {
                     numPartitions = partitions;
                 } else if (numPartitions != partitions) {
                     String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
@@ -668,7 +678,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         // if all topics for this co-partition group is repartition topics,
         // then set the number of partitions to be the maximum of the number of partitions.
-        if (numPartitions == -1) {
+        if (numPartitions == UNKNOWN) {
             for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
                 if (copartitionGroup.contains(entry.getKey())) {
                     int partitions = entry.getValue().numPartitions;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d586cb5/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 6ed2ffd..efb5c81 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -47,7 +46,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -137,7 +136,7 @@ public class ResetIntegrationTest {
         }
     }
 
-    @Ignore
+    @Test
     public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
         CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
 
@@ -158,16 +157,22 @@ public class ResetIntegrationTest {
             60000);
         // receive only first values to make sure intermediate user topic is not consumed completely
         // => required to test "seekToEnd" for intermediate topics
-        final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+        final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
             OUTPUT_TOPIC_2,
-            1
-        ).get(0);
+            10
+        );
 
         streams.close();
         TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
             "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
+        // insert bad record to maks sure intermediate user topic gets seekToEnd()
+        mockTime.sleep(1);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                INTERMEDIATE_USER_TOPIC,
+                Collections.singleton(new KeyValue<>(-1L, "badRecord-ShouldBeSkipped")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class), mockTime.milliseconds());
+
         // RESET
         streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
         streams.cleanUp();
@@ -184,11 +189,11 @@ public class ResetIntegrationTest {
             OUTPUT_TOPIC,
             10,
             60000);
-        final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+        final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
             OUTPUT_TOPIC_2_RERUN,
-            1
-        ).get(0);
+            10
+        );
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
@@ -219,7 +224,7 @@ public class ResetIntegrationTest {
         }
     }
 
-    @Ignore
+    @Test
     public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
         final Properties streamsConfiguration = prepareTest();
         final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
@@ -274,7 +279,7 @@ public class ResetIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
@@ -316,32 +321,17 @@ public class ResetIntegrationTest {
         final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
 
         // use map to trigger internal re-partitioning before groupByKey
-        final KTable<Long, Long> globalCounts = input
-            .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
+        input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
                 @Override
                 public KeyValue<Long, String> apply(final Long key, final String value) {
                     return new KeyValue<>(key, value);
                 }
             })
             .groupByKey()
-            .count("global-count");
-        globalCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
-
-        final KStream<Long, Long> windowedCounts = input
-            .through(INTERMEDIATE_USER_TOPIC)
-            .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
-                private long sleep = 1000;
+            .count("global-count")
+            .to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
 
-                @Override
-                public KeyValue<Long, String> apply(final Long key, final String value) {
-                    // must sleep long enough to avoid processing the whole intermediate topic before application gets stopped
-                    // => want to test "skip over" unprocessed records
-                    // increasing the sleep time only has disadvantage that test run time is increased
-                    mockTime.sleep(sleep);
-                    sleep *= 2;
-                    return new KeyValue<>(key, value);
-                }
-            })
+        input.through(INTERMEDIATE_USER_TOPIC)
             .groupByKey()
             .count(TimeWindows.of(35).advanceBy(10), "count")
             .toStream()
@@ -350,8 +340,8 @@ public class ResetIntegrationTest {
                 public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) {
                     return new KeyValue<>(key.window().start() + key.window().end(), value);
                 }
-            });
-        windowedCounts.to(Serdes.Long(), Serdes.Long(), outputTopic2);
+            })
+            .to(Serdes.Long(), Serdes.Long(), outputTopic2);
 
         return builder;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d586cb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 3730785..d40956e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -28,6 +28,11 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
@@ -51,9 +56,11 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 
 public class StreamPartitionAssignorTest {
 
@@ -812,7 +819,73 @@ public class StreamPartitionAssignorTest {
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         final Cluster cluster = partitionAssignor.clusterMetadata();
         assertNotNull(cluster);
+    }
+
+    @Test
+    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
+        final String applicationId = "application-id";
 
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId(applicationId);
+
+        KStream<Object, Object> stream1 = builder
+            .stream("topic1")
+            .selectKey(new KeyValueMapper<Object, Object, Object>() {
+                @Override
+                public Object apply(Object key, Object value) {
+                    return null;
+                }
+            })
+            .through("topic2");
+        builder
+            .stream("unknownTopic")
+            .selectKey(new KeyValueMapper<Object, Object, Object>() {
+                @Override
+                public Object apply(Object key, Object value) {
+                    return null;
+                }
+            })
+            .join(
+                stream1,
+                new ValueJoiner() {
+                    @Override
+                    public Object apply(Object value1, Object value2) {
+                        return null;
+                    }
+                },
+                JoinWindows.of(0)
+            );
+
+        final UUID uuid = UUID.randomUUID();
+        final String client = "client1";
+
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
+
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        subscriptions.put(
+            client,
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("unknownTopic"),
+                new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
+            )
+        );
+
+        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+
+        final List<TopicPartition> expectedAssignment = Arrays.asList(
+            new TopicPartition("topic1", 0),
+            new TopicPartition("topic1", 1),
+            new TopicPartition("topic1", 2),
+            new TopicPartition("topic2", 0),
+            new TopicPartition("topic2", 1),
+            new TopicPartition("topic2", 2)
+        );
+        assertThat(expectedAssignment, equalTo(assignment.get(client).partitions()));
     }
 
     private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssignor.Assignment assignment) {