You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/02/02 18:07:14 UTC

[kafka] branch 2.6 updated: KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops (#10020)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 69a07a7  KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops (#10020)
69a07a7 is described below

commit 69a07a780a1f98f5b2a111eeb492ca61dae489db
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue Feb 2 10:05:44 2021 -0800

    KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops (#10020)
    
    Backport of #9568 to 2.6 branch.
    
    Fix infinite loop in assignor when trying to resolve the number of partitions in a topology with a windowed FKJ. Also adds a check to this loop to break out and fail the application if we detect that we are/will be stuck in an infinite loop
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../kstream/internals/graph/StreamSinkNode.java    | 19 +++---
 .../internals/StreamsPartitionAssignor.java        | 29 +++++----
 .../integration/InternalTopicIntegrationTest.java  | 39 +++++++++++-
 .../internals/StreamsPartitionAssignorTest.java    | 74 +++++++++++++++++++++-
 5 files changed, 138 insertions(+), 25 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 446533a..f2f642c 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -193,7 +193,7 @@
 
     <!-- Streams tests -->
     <suppress checks="ClassFanOutComplexity"
-              files="(StreamThreadTest|StreamTaskTest|ProcessorTopologyTestDriver).java"/>
+              files="(StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|ProcessorTopologyTestDriver).java"/>
 
     <suppress checks="MethodLength"
               files="KStreamKTableJoinIntegrationTest.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
index 40ce357..ec211f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
@@ -51,21 +51,24 @@ public class StreamSinkNode<K, V> extends StreamsGraphNode {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         final Serializer<K> keySerializer = producedInternal.keySerde() == null ? null : producedInternal.keySerde().serializer();
         final Serializer<V> valSerializer = producedInternal.valueSerde() == null ? null : producedInternal.valueSerde().serializer();
-        final StreamPartitioner<? super K, ? super V> partitioner = producedInternal.streamPartitioner();
         final String[] parentNames = parentNodeNames();
 
-        if (partitioner == null && keySerializer instanceof WindowedSerializer) {
-            @SuppressWarnings("unchecked")
-            final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>((WindowedSerializer) keySerializer);
-            topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, windowedPartitioner, parentNames);
-        } else if (topicNameExtractor instanceof StaticTopicNameExtractor) {
-            final String topicName = ((StaticTopicNameExtractor) topicNameExtractor).topicName;
+        final StreamPartitioner<? super K, ? super V> partitioner;
+        if (producedInternal.streamPartitioner() == null && keySerializer instanceof WindowedSerializer) {
+            partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<K, V>((WindowedSerializer<K>) keySerializer);
+        } else {
+            partitioner = producedInternal.streamPartitioner();
+        }
+
+        if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+            final String topicName = ((StaticTopicNameExtractor<K, V>) topicNameExtractor).topicName;
             topologyBuilder.addSink(nodeName(), topicName, keySerializer, valSerializer, partitioner, parentNames);
         } else {
-            topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, partitioner,  parentNames);
+            topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, partitioner, parentNames);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index ef4bca9..b6556f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -522,10 +522,11 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         boolean numPartitionsNeeded;
         do {
             numPartitionsNeeded = false;
+            boolean progressMadeThisIteration = false;  // avoid infinitely looping without making any progress on unknown repartitions
 
             for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
+                for (final String repartitionSourceTopic : topicsInfo.repartitionSourceTopics.keySet()) {
+                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(repartitionSourceTopic)
                                                                      .numberOfPartitions();
                     Integer numPartitions = null;
 
@@ -534,24 +535,24 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                         for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
                             final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
 
-                            if (otherSinkTopics.contains(topicName)) {
+                            if (otherSinkTopics.contains(repartitionSourceTopic)) {
                                 // if this topic is one of the sink topics of this topology,
                                 // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
+                                for (final String upstreamSourceTopic : otherTopicsInfo.sourceTopics) {
                                     Integer numPartitionsCandidate = null;
                                     // It is possible the sourceTopic is another internal topic, i.e,
                                     // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
+                                    if (repartitionTopicMetadata.containsKey(upstreamSourceTopic)) {
+                                        if (repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent()) {
                                             numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
+                                                repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get();
                                         }
                                     } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
+                                        final Integer count = metadata.partitionCountForTopic(upstreamSourceTopic);
                                         if (count == null) {
                                             throw new IllegalStateException(
                                                 "No partition count found for source topic "
-                                                    + sourceTopicName
+                                                    + upstreamSourceTopic
                                                     + ", but it should have been."
                                             );
                                         }
@@ -567,16 +568,20 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                             }
                         }
 
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
                         if (numPartitions == null) {
                             numPartitionsNeeded = true;
+                            log.trace("Unable to determine number of partitions for {}, another iteration is needed",
+                                      repartitionSourceTopic);
                         } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
+                            repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions);
+                            progressMadeThisIteration = true;
                         }
                     }
                 }
             }
+            if (!progressMadeThisIteration && numPartitionsNeeded) {
+                throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics");
+            }
         } while (numPartitionsNeeded);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 0a71481..8f57dc3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.time.Duration;
 import kafka.log.LogConfig;
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.admin.Admin;
@@ -33,7 +34,9 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -59,6 +62,8 @@ import java.util.concurrent.TimeUnit;
 
 import static java.time.Duration.ofMillis;
 import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -73,6 +78,7 @@ public class InternalTopicIntegrationTest {
 
     private static final String APP_ID = "internal-topics-integration-test";
     private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
+    private static final String DEFAULT_INPUT_TABLE_TOPIC = "inputTable";
 
     private final MockTime mockTime = CLUSTER.time;
 
@@ -80,7 +86,7 @@ public class InternalTopicIntegrationTest {
 
     @BeforeClass
     public static void startKafkaCluster() throws InterruptedException {
-        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC);
+        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
     }
 
     @Before
@@ -136,6 +142,37 @@ public class InternalTopicIntegrationTest {
         return Admin.create(adminClientConfig);
     }
 
+    /*
+     * This test just ensures that that the assignor does not get stuck during partition number resolution
+     * for internal repartition topics. See KAFKA-10689
+     */
+    @Test
+    public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception {
+        final String appID = APP_ID + "-windowed-FKJ";
+        streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final KStream<String, String> inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
+        final KTable<String, String> inputTable = streamsBuilder.table(DEFAULT_INPUT_TABLE_TOPIC);
+        inputTopic
+            .groupBy(
+                (k, v) -> k,
+                Grouped.with("GroupName", Serdes.String(), Serdes.String())
+            )
+            .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
+            .aggregate(
+                () -> "",
+                (k, v, a) -> a + k)
+            .leftJoin(
+                inputTable,
+                v -> v,
+                (x, y) -> x + y
+            );
+
+        final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsProp);
+        startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(60));
+    }
+
     @Test
     public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception {
         final String appID = APP_ID + "-compact";
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index b75d278..062fcd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.SortedSet;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -35,24 +34,31 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
-import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
@@ -68,6 +74,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -77,6 +84,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -87,6 +95,7 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -1079,7 +1088,7 @@ public class StreamsPartitionAssignorTest {
         EasyMock.verify(streamsMetadataState);
         EasyMock.verify(taskManager);
 
-        assertEquals(Collections.singleton(t3p0.topic()), capturedCluster.getValue().topics());
+        assertEquals(singleton(t3p0.topic()), capturedCluster.getValue().topics());
         assertEquals(2, capturedCluster.getValue().partitionsForTopic(t3p0.topic()).size());
     }
 
@@ -1987,6 +1996,65 @@ public class StreamsPartitionAssignorTest {
         assertEquals(-128, partitionAssignor.uniqueField());
     }
 
+    @Test
+    public void shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount() {
+        builder = new CorruptedInternalTopologyBuilder();
+        final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder);
+
+        final KStream<String, String> inputTopic = streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>());
+        final KTable<String, String> inputTable = streamsBuilder.table("topic2", new ConsumedInternal<>(), new MaterializedInternal<>(Materialized.as("store")));
+        inputTopic
+            .groupBy(
+                (k, v) -> k,
+                Grouped.with("GroupName", Serdes.String(), Serdes.String())
+            )
+            .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
+            .aggregate(
+                () -> "",
+                (k, v, a) -> a + k)
+            .leftJoin(
+                inputTable,
+                v -> v,
+                (x, y) -> x + y
+            );
+        streamsBuilder.buildAndOptimizeTopology();
+
+        configureDefault();
+
+        subscriptions.put("consumer",
+                          new Subscription(
+                              singletonList("topic"),
+                              defaultSubscriptionInfo.encode()
+                          ));
+        final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+        assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(),
+                   equalTo(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()));
+    }
+
+    private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder {
+        private Map<Integer, TopicsInfo> corruptedTopicGroups;
+
+        @Override
+        public synchronized Map<Integer, TopicsInfo> topicGroups() {
+            if (corruptedTopicGroups == null) {
+                corruptedTopicGroups = new HashMap<>();
+                for (final Map.Entry<Integer, TopicsInfo> topicGroupEntry : super.topicGroups().entrySet()) {
+                    final TopicsInfo originalInfo = topicGroupEntry.getValue();
+                    corruptedTopicGroups.put(
+                        topicGroupEntry.getKey(),
+                        new TopicsInfo(
+                            emptySet(),
+                            originalInfo.sourceTopics,
+                            originalInfo.repartitionSourceTopics,
+                            originalInfo.stateChangelogTopics
+                        ));
+                }
+            }
+
+            return corruptedTopicGroups;
+        }
+    }
+
     private static ByteBuffer encodeFutureSubscription() {
         final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */);
         buf.putInt(LATEST_SUPPORTED_VERSION + 1);