You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/11 05:48:48 UTC
kafka git commit: KAFKA-4476: Kafka Streams gets stuck if metadata is
missing
Repository: kafka
Updated Branches:
refs/heads/trunk 8e9e17767 -> 1d586cb50
KAFKA-4476: Kafka Streams gets stuck if metadata is missing
- break loop in StreamPartitionAssigner.assign() in case partition metadata is missing
- fit state transition issue (follow up to KAFKA-3637: Add method that checks if streams are initialised)
- some test improvements
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Eno Thereska, Ismael Juma, Guozhang Wang
Closes #2209 from mjsax/kafka-4476-stuck-on-missing-metadata
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d586cb5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d586cb5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d586cb5
Branch: refs/heads/trunk
Commit: 1d586cb50a94540f6b931a8d525ba75273f314f0
Parents: 8e9e177
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sat Dec 10 21:48:44 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Dec 10 21:48:44 2016 -0800
----------------------------------------------------------------------
.../internals/StreamPartitionAssignor.java | 28 +++++---
.../integration/ResetIntegrationTest.java | 54 ++++++---------
.../internals/StreamPartitionAssignorTest.java | 73 ++++++++++++++++++++
3 files changed, 114 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d586cb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 84f78dc..7e15f70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -59,6 +59,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
+ public final static int UNKNOWN = -1;
+ public final static int NOT_AVAILABLE = -2;
+
private static class AssignedPartition implements Comparable<AssignedPartition> {
public final TaskId taskId;
public final TopicPartition partition;
@@ -128,7 +131,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
InternalTopicMetadata(final InternalTopicConfig config) {
this.config = config;
- this.numPartitions = -1;
+ this.numPartitions = UNKNOWN;
}
}
@@ -140,7 +143,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
if (result != 0) {
return result;
} else {
- return p1.partition() < p2.partition() ? -1 : (p1.partition() > p2.partition() ? 1 : 0);
+ return p1.partition() < p2.partition() ? UNKNOWN : (p1.partition() > p2.partition() ? 1 : 0);
}
}
};
@@ -311,7 +314,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
// try set the number of partitions for this repartition topic if it is not set yet
- if (numPartitions == -1) {
+ if (numPartitions == UNKNOWN) {
for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
@@ -326,6 +329,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions;
} else {
numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
+ if (numPartitionsCandidate == null) {
+ repartitionTopicMetadata.get(topicName).numPartitions = NOT_AVAILABLE;
+ }
}
if (numPartitionsCandidate != null && numPartitionsCandidate > numPartitions) {
@@ -337,7 +343,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// if we still have not find the right number of partitions,
// another iteration is needed
- if (numPartitions == -1)
+ if (numPartitions == UNKNOWN)
numPartitionsNeeded = true;
else
repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
@@ -429,7 +435,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
// the expected number of partitions is the max value of TaskId.partition + 1
- int numPartitions = -1;
+ int numPartitions = UNKNOWN;
if (tasksByTopicGroup.get(topicGroupId) != null) {
for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
if (numPartitions < task.partition + 1)
@@ -607,8 +613,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
InternalTopicConfig topic = entry.getValue().config;
Integer numPartitions = entry.getValue().numPartitions;
- if (numPartitions < 0)
+ if (numPartitions == NOT_AVAILABLE) {
+ continue;
+ }
+ if (numPartitions < 0) {
throw new TopologyBuilderException(String.format("stream-thread [%s] Topic [%s] number of partitions not defined", streamThread.getName(), topic.name()));
+ }
internalTopicManager.makeReady(topic, numPartitions);
@@ -647,7 +657,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private void ensureCopartitioning(Set<String> copartitionGroup,
Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
Cluster metadata) {
- int numPartitions = -1;
+ int numPartitions = UNKNOWN;
for (String topic : copartitionGroup) {
if (!allRepartitionTopicsNumPartitions.containsKey(topic)) {
@@ -656,7 +666,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
if (partitions == null)
throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", streamThread.getName(), topic));
- if (numPartitions == -1) {
+ if (numPartitions == UNKNOWN) {
numPartitions = partitions;
} else if (numPartitions != partitions) {
String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
@@ -668,7 +678,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// if all topics for this co-partition group is repartition topics,
// then set the number of partitions to be the maximum of the number of partitions.
- if (numPartitions == -1) {
+ if (numPartitions == UNKNOWN) {
for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) {
int partitions = entry.getValue().numPartitions;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d586cb5/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 6ed2ffd..efb5c81 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
@@ -47,7 +46,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Test;
import java.util.Collections;
import java.util.HashSet;
@@ -137,7 +136,7 @@ public class ResetIntegrationTest {
}
}
- @Ignore
+ @Test
public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
@@ -158,16 +157,22 @@ public class ResetIntegrationTest {
60000);
// receive only first values to make sure intermediate user topic is not consumed completely
// => required to test "seekToEnd" for intermediate topics
- final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC_2,
- 1
- ).get(0);
+ 10
+ );
streams.close();
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+ // insert bad record to maks sure intermediate user topic gets seekToEnd()
+ mockTime.sleep(1);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ INTERMEDIATE_USER_TOPIC,
+ Collections.singleton(new KeyValue<>(-1L, "badRecord-ShouldBeSkipped")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class), mockTime.milliseconds());
+
// RESET
streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
streams.cleanUp();
@@ -184,11 +189,11 @@ public class ResetIntegrationTest {
OUTPUT_TOPIC,
10,
60000);
- final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC_2_RERUN,
- 1
- ).get(0);
+ 10
+ );
streams.close();
assertThat(resultRerun, equalTo(result));
@@ -219,7 +224,7 @@ public class ResetIntegrationTest {
}
}
- @Ignore
+ @Test
public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
final Properties streamsConfiguration = prepareTest();
final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
@@ -274,7 +279,7 @@ public class ResetIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
+ streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
@@ -316,32 +321,17 @@ public class ResetIntegrationTest {
final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
// use map to trigger internal re-partitioning before groupByKey
- final KTable<Long, Long> globalCounts = input
- .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
+ input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
@Override
public KeyValue<Long, String> apply(final Long key, final String value) {
return new KeyValue<>(key, value);
}
})
.groupByKey()
- .count("global-count");
- globalCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
-
- final KStream<Long, Long> windowedCounts = input
- .through(INTERMEDIATE_USER_TOPIC)
- .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
- private long sleep = 1000;
+ .count("global-count")
+ .to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
- @Override
- public KeyValue<Long, String> apply(final Long key, final String value) {
- // must sleep long enough to avoid processing the whole intermediate topic before application gets stopped
- // => want to test "skip over" unprocessed records
- // increasing the sleep time only has disadvantage that test run time is increased
- mockTime.sleep(sleep);
- sleep *= 2;
- return new KeyValue<>(key, value);
- }
- })
+ input.through(INTERMEDIATE_USER_TOPIC)
.groupByKey()
.count(TimeWindows.of(35).advanceBy(10), "count")
.toStream()
@@ -350,8 +340,8 @@ public class ResetIntegrationTest {
public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) {
return new KeyValue<>(key.window().start() + key.window().end(), value);
}
- });
- windowedCounts.to(Serdes.Long(), Serdes.Long(), outputTopic2);
+ })
+ .to(Serdes.Long(), Serdes.Long(), outputTopic2);
return builder;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d586cb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 3730785..d40956e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -28,6 +28,11 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
@@ -51,9 +56,11 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
public class StreamPartitionAssignorTest {
@@ -812,7 +819,73 @@ public class StreamPartitionAssignorTest {
final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
final Cluster cluster = partitionAssignor.clusterMetadata();
assertNotNull(cluster);
+ }
+
+ @Test
+ public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
+ final String applicationId = "application-id";
+ final KStreamBuilder builder = new KStreamBuilder();
+ builder.setApplicationId(applicationId);
+
+ KStream<Object, Object> stream1 = builder
+ .stream("topic1")
+ .selectKey(new KeyValueMapper<Object, Object, Object>() {
+ @Override
+ public Object apply(Object key, Object value) {
+ return null;
+ }
+ })
+ .through("topic2");
+ builder
+ .stream("unknownTopic")
+ .selectKey(new KeyValueMapper<Object, Object, Object>() {
+ @Override
+ public Object apply(Object key, Object value) {
+ return null;
+ }
+ })
+ .join(
+ stream1,
+ new ValueJoiner() {
+ @Override
+ public Object apply(Object value1, Object value2) {
+ return null;
+ }
+ },
+ JoinWindows.of(0)
+ );
+
+ final UUID uuid = UUID.randomUUID();
+ final String client = "client1";
+
+ final StreamsConfig config = new StreamsConfig(configProps());
+ final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+
+ final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
+
+ final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+ subscriptions.put(
+ client,
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("unknownTopic"),
+ new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
+ )
+ );
+
+ final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+
+ final List<TopicPartition> expectedAssignment = Arrays.asList(
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1),
+ new TopicPartition("topic1", 2),
+ new TopicPartition("topic2", 0),
+ new TopicPartition("topic2", 1),
+ new TopicPartition("topic2", 2)
+ );
+ assertThat(expectedAssignment, equalTo(assignment.get(client).partitions()));
}
private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssignor.Assignment assignment) {