You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/02/02 18:07:14 UTC
[kafka] branch 2.6 updated: KAFKA-10689: fix windowed FKJ topology
and put checks in assignor to avoid infinite loops (#10020)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 69a07a7 KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops (#10020)
69a07a7 is described below
commit 69a07a780a1f98f5b2a111eeb492ca61dae489db
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue Feb 2 10:05:44 2021 -0800
KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops (#10020)
Backport of #9568 to 2.6 branch.
Fix infinite loop in assignor when trying to resolve the number of partitions in a topology with a windowed FKJ. Also adds a check to this loop to break out and fail the application if we detect that we are/will be stuck in an infinite loop
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>
---
checkstyle/suppressions.xml | 2 +-
.../kstream/internals/graph/StreamSinkNode.java | 19 +++---
.../internals/StreamsPartitionAssignor.java | 29 +++++----
.../integration/InternalTopicIntegrationTest.java | 39 +++++++++++-
.../internals/StreamsPartitionAssignorTest.java | 74 +++++++++++++++++++++-
5 files changed, 138 insertions(+), 25 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 446533a..f2f642c 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -193,7 +193,7 @@
<!-- Streams tests -->
<suppress checks="ClassFanOutComplexity"
- files="(StreamThreadTest|StreamTaskTest|ProcessorTopologyTestDriver).java"/>
+ files="(StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|ProcessorTopologyTestDriver).java"/>
<suppress checks="MethodLength"
files="KStreamKTableJoinIntegrationTest.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
index 40ce357..ec211f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
@@ -51,21 +51,24 @@ public class StreamSinkNode<K, V> extends StreamsGraphNode {
}
@Override
+ @SuppressWarnings("unchecked")
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
final Serializer<K> keySerializer = producedInternal.keySerde() == null ? null : producedInternal.keySerde().serializer();
final Serializer<V> valSerializer = producedInternal.valueSerde() == null ? null : producedInternal.valueSerde().serializer();
- final StreamPartitioner<? super K, ? super V> partitioner = producedInternal.streamPartitioner();
final String[] parentNames = parentNodeNames();
- if (partitioner == null && keySerializer instanceof WindowedSerializer) {
- @SuppressWarnings("unchecked")
- final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>((WindowedSerializer) keySerializer);
- topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, windowedPartitioner, parentNames);
- } else if (topicNameExtractor instanceof StaticTopicNameExtractor) {
- final String topicName = ((StaticTopicNameExtractor) topicNameExtractor).topicName;
+ final StreamPartitioner<? super K, ? super V> partitioner;
+ if (producedInternal.streamPartitioner() == null && keySerializer instanceof WindowedSerializer) {
+ partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<K, V>((WindowedSerializer<K>) keySerializer);
+ } else {
+ partitioner = producedInternal.streamPartitioner();
+ }
+
+ if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+ final String topicName = ((StaticTopicNameExtractor<K, V>) topicNameExtractor).topicName;
topologyBuilder.addSink(nodeName(), topicName, keySerializer, valSerializer, partitioner, parentNames);
} else {
- topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, partitioner, parentNames);
+ topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, partitioner, parentNames);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index ef4bca9..b6556f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -522,10 +522,11 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
boolean numPartitionsNeeded;
do {
numPartitionsNeeded = false;
+ boolean progressMadeThisIteration = false; // avoid infinitely looping without making any progress on unknown repartitions
for (final TopicsInfo topicsInfo : topicGroups.values()) {
- for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
- final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
+ for (final String repartitionSourceTopic : topicsInfo.repartitionSourceTopics.keySet()) {
+ final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(repartitionSourceTopic)
.numberOfPartitions();
Integer numPartitions = null;
@@ -534,24 +535,24 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
- if (otherSinkTopics.contains(topicName)) {
+ if (otherSinkTopics.contains(repartitionSourceTopic)) {
// if this topic is one of the sink topics of this topology,
// use the maximum of all its source topic partitions as the number of partitions
- for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
+ for (final String upstreamSourceTopic : otherTopicsInfo.sourceTopics) {
Integer numPartitionsCandidate = null;
// It is possible the sourceTopic is another internal topic, i.e,
// map().join().join(map())
- if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
- if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
+ if (repartitionTopicMetadata.containsKey(upstreamSourceTopic)) {
+ if (repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent()) {
numPartitionsCandidate =
- repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
+ repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get();
}
} else {
- final Integer count = metadata.partitionCountForTopic(sourceTopicName);
+ final Integer count = metadata.partitionCountForTopic(upstreamSourceTopic);
if (count == null) {
throw new IllegalStateException(
"No partition count found for source topic "
- + sourceTopicName
+ + upstreamSourceTopic
+ ", but it should have been."
);
}
@@ -567,16 +568,20 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
}
- // if we still have not found the right number of partitions,
- // another iteration is needed
if (numPartitions == null) {
numPartitionsNeeded = true;
+ log.trace("Unable to determine number of partitions for {}, another iteration is needed",
+ repartitionSourceTopic);
} else {
- repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
+ repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions);
+ progressMadeThisIteration = true;
}
}
}
}
+ if (!progressMadeThisIteration && numPartitionsNeeded) {
+ throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics");
+ }
} while (numPartitionsNeeded);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 0a71481..8f57dc3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
+import java.time.Duration;
import kafka.log.LogConfig;
import kafka.utils.MockTime;
import org.apache.kafka.clients.admin.Admin;
@@ -33,7 +34,9 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -59,6 +62,8 @@ import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -73,6 +78,7 @@ public class InternalTopicIntegrationTest {
private static final String APP_ID = "internal-topics-integration-test";
private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
+ private static final String DEFAULT_INPUT_TABLE_TOPIC = "inputTable";
private final MockTime mockTime = CLUSTER.time;
@@ -80,7 +86,7 @@ public class InternalTopicIntegrationTest {
@BeforeClass
public static void startKafkaCluster() throws InterruptedException {
- CLUSTER.createTopics(DEFAULT_INPUT_TOPIC);
+ CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
}
@Before
@@ -136,6 +142,37 @@ public class InternalTopicIntegrationTest {
return Admin.create(adminClientConfig);
}
+ /*
+ * This test just ensures that that the assignor does not get stuck during partition number resolution
+ * for internal repartition topics. See KAFKA-10689
+ */
+ @Test
+ public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception {
+ final String appID = APP_ID + "-windowed-FKJ";
+ streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+ final KStream<String, String> inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
+ final KTable<String, String> inputTable = streamsBuilder.table(DEFAULT_INPUT_TABLE_TOPIC);
+ inputTopic
+ .groupBy(
+ (k, v) -> k,
+ Grouped.with("GroupName", Serdes.String(), Serdes.String())
+ )
+ .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
+ .aggregate(
+ () -> "",
+ (k, v, a) -> a + k)
+ .leftJoin(
+ inputTable,
+ v -> v,
+ (x, y) -> x + y
+ );
+
+ final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsProp);
+ startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(60));
+ }
+
@Test
public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception {
final String appID = APP_ID + "-compact";
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index b75d278..062fcd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.SortedSet;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -35,24 +34,31 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
-import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
@@ -68,6 +74,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -77,6 +84,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -87,6 +95,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -1079,7 +1088,7 @@ public class StreamsPartitionAssignorTest {
EasyMock.verify(streamsMetadataState);
EasyMock.verify(taskManager);
- assertEquals(Collections.singleton(t3p0.topic()), capturedCluster.getValue().topics());
+ assertEquals(singleton(t3p0.topic()), capturedCluster.getValue().topics());
assertEquals(2, capturedCluster.getValue().partitionsForTopic(t3p0.topic()).size());
}
@@ -1987,6 +1996,65 @@ public class StreamsPartitionAssignorTest {
assertEquals(-128, partitionAssignor.uniqueField());
}
+ @Test
+ public void shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount() {
+ builder = new CorruptedInternalTopologyBuilder();
+ final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder);
+
+ final KStream<String, String> inputTopic = streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>());
+ final KTable<String, String> inputTable = streamsBuilder.table("topic2", new ConsumedInternal<>(), new MaterializedInternal<>(Materialized.as("store")));
+ inputTopic
+ .groupBy(
+ (k, v) -> k,
+ Grouped.with("GroupName", Serdes.String(), Serdes.String())
+ )
+ .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
+ .aggregate(
+ () -> "",
+ (k, v, a) -> a + k)
+ .leftJoin(
+ inputTable,
+ v -> v,
+ (x, y) -> x + y
+ );
+ streamsBuilder.buildAndOptimizeTopology();
+
+ configureDefault();
+
+ subscriptions.put("consumer",
+ new Subscription(
+ singletonList("topic"),
+ defaultSubscriptionInfo.encode()
+ ));
+ final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+ assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(),
+ equalTo(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()));
+ }
+
+ private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder {
+ private Map<Integer, TopicsInfo> corruptedTopicGroups;
+
+ @Override
+ public synchronized Map<Integer, TopicsInfo> topicGroups() {
+ if (corruptedTopicGroups == null) {
+ corruptedTopicGroups = new HashMap<>();
+ for (final Map.Entry<Integer, TopicsInfo> topicGroupEntry : super.topicGroups().entrySet()) {
+ final TopicsInfo originalInfo = topicGroupEntry.getValue();
+ corruptedTopicGroups.put(
+ topicGroupEntry.getKey(),
+ new TopicsInfo(
+ emptySet(),
+ originalInfo.sourceTopics,
+ originalInfo.repartitionSourceTopics,
+ originalInfo.stateChangelogTopics
+ ));
+ }
+ }
+
+ return corruptedTopicGroups;
+ }
+ }
+
private static ByteBuffer encodeFutureSubscription() {
final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */);
buf.putInt(LATEST_SUPPORTED_VERSION + 1);