You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/29 21:17:06 UTC

[kafka] branch trunk updated: MINOR: Move KTable source topic for changelog to optimization framework (#6500)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 369d89f  MINOR: Move KTable source topic for changelog to optimization framework (#6500)
369d89f is described below

commit 369d89f2080586773b5a3cc432cbcf655aa1f625
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri Mar 29 17:16:56 2019 -0400

    MINOR: Move KTable source topic for changelog to optimization framework (#6500)
    
    Since we've added Kafka Streams optimizations in 2.1 we need to move the optimization for source KTable nodes (use source topic as changelog) to the optimization framework.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  9 ++++++
 .../kstream/internals/graph/TableSourceNode.java   | 12 +++++++-
 .../internals/InternalTopologyBuilder.java         | 36 +---------------------
 .../apache/kafka/streams/StreamsBuilderTest.java   |  2 +-
 .../integration/RestoreIntegrationTest.java        |  2 +-
 5 files changed, 23 insertions(+), 38 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index c06b988..920f213 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -61,6 +61,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
     private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
     private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet<>();
+    private final LinkedHashSet<StreamsGraphNode> tableSourceNodes = new LinkedHashSet<>();
 
     private static final String TOPOLOGY_ROOT = "root";
     private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class);
@@ -254,6 +255,8 @@ public class InternalStreamsBuilder implements InternalNameProvider {
             }
         } else if (node.isMergeNode()) {
             mergeNodes.add(node);
+        } else if (node instanceof TableSourceNode) {
+            tableSourceNodes.add(node);
         }
     }
 
@@ -292,10 +295,16 @@ public class InternalStreamsBuilder implements InternalNameProvider {
 
         if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION))) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
+            optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
         }
     }
 
+    private void optimizeKTableSourceTopics() {
+        LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs ");
+        tableSourceNodes.forEach(node -> ((TableSourceNode) node).reuseSourceTopicForChangeLog(true));
+    }
+
     @SuppressWarnings("unchecked")
     private void maybeOptimizeRepartitionOperations() {
         maybeUpdateKeyChangingRepartitionNodeMap();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 53061dc..fa979b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -38,6 +38,7 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
     private final ProcessorParameters<K, V> processorParameters;
     private final String sourceName;
     private final boolean isGlobalKTable;
+    private boolean shouldReuseSourceTopicForChangelog = false;
 
     private TableSourceNode(final String nodeName,
                             final String sourceName,
@@ -57,6 +58,11 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
         this.materializedInternal = materializedInternal;
     }
 
+
+    public void reuseSourceTopicForChangeLog(final boolean shouldReuseSourceTopicForChangelog) {
+        this.shouldReuseSourceTopicForChangelog = shouldReuseSourceTopicForChangelog;
+    }
+
     @Override
     public String toString() {
         return "TableSourceNode{" +
@@ -104,7 +110,11 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
             final KTableSource<K, V> ktableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
             if (ktableSource.queryableName() != null) {
                 topologyBuilder.addStateStore(storeBuilder, nodeName());
-                topologyBuilder.markSourceStoreAndTopic(storeBuilder, topicName);
+
+                if (shouldReuseSourceTopicForChangelog) {
+                    storeBuilder.withLoggingDisabled();
+                    topologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName);
+                }
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 334adce..792df53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -123,9 +123,6 @@ public class InternalTopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    // TODO: this is only temporary for 2.0 and should be removed
-    private final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>();
-
     public static class StateStoreFactory {
         private final StoreBuilder builder;
         private final Set<String> users = new HashSet<>();
@@ -359,10 +356,6 @@ public class InternalTopologyBuilder {
             globalStateStores.put(storeBuilder.name(), storeBuilder.build());
         }
 
-        // adjust the topology if optimization is turned on.
-        // TODO: to be removed post 2.0
-        adjust(config);
-
         return this;
     }
 
@@ -606,7 +599,7 @@ public class InternalTopologyBuilder {
         nodeGroups = null;
     }
 
-    private void connectSourceStoreAndTopic(final String sourceStoreName,
+    public void connectSourceStoreAndTopic(final String sourceStoreName,
                                             final String topic) {
         if (storeToChangelogTopic.containsKey(sourceStoreName)) {
             throw new TopologyException("Source store " + sourceStoreName + " is already added.");
@@ -614,14 +607,6 @@ public class InternalTopologyBuilder {
         storeToChangelogTopic.put(sourceStoreName, topic);
     }
 
-    public final void markSourceStoreAndTopic(final StoreBuilder storeBuilder,
-                                              final String topic) {
-        if (storeToSourceChangelogTopic.containsKey(storeBuilder)) {
-            throw new TopologyException("Source store " + storeBuilder.name() + " is already used.");
-        }
-        storeToSourceChangelogTopic.put(storeBuilder, topic);
-    }
-
     public final void addInternalTopic(final String topicName) {
         Objects.requireNonNull(topicName, "topicName can't be null");
         internalTopicNames.add(topicName);
@@ -1071,25 +1056,6 @@ public class InternalTopologyBuilder {
         return Collections.unmodifiableMap(topicGroups);
     }
 
-    // Adjust the generated topology based on the configs.
-    // Not exposed as public API and should be removed post 2.0
-    private void adjust(final StreamsConfig config) {
-        final boolean enableOptimization20 =
-            config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
-
-        if (enableOptimization20) {
-            for (final Map.Entry<StoreBuilder, String> entry : storeToSourceChangelogTopic.entrySet()) {
-                final StoreBuilder storeBuilder = entry.getKey();
-                final String topicName = entry.getValue();
-
-                // update store map to disable logging for this store
-                storeBuilder.withLoggingDisabled();
-                addStateStore(storeBuilder, true);
-                connectSourceStoreAndTopic(storeBuilder.name(), topicName);
-            }
-        }
-    }
-
     private void setRegexMatchedTopicsToSourceNodes() {
         if (subscriptionUpdates.hasUpdates()) {
             for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index b51eac8..f140546 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -349,9 +349,9 @@ public class StreamsBuilderTest {
     public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
         final String topic = "topic";
         builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
-        final Topology topology = builder.build();
         final Properties props = StreamsTestUtils.getStreamsConfig();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        final Topology topology = builder.build(props);
 
         final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
         internalTopologyBuilder.rewriteTopology(new StreamsConfig(props));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 830514b..f21dbfc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -146,7 +146,7 @@ public class RestoreIntegrationTest {
                     }
                 });
 
-        kafkaStreams = new KafkaStreams(builder.build(), props);
+        kafkaStreams = new KafkaStreams(builder.build(props), props);
         kafkaStreams.setStateListener((newState, oldState) -> {
             if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                 startupLatch.countDown();