You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/09/22 16:21:05 UTC

[kafka] branch trunk updated: KAFKA-14209: Change Topology optimization to accept list of rules 1/3 (#12641)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cda5da9b65f KAFKA-14209: Change Topology optimization to accept list of rules 1/3 (#12641)
cda5da9b65f is described below

commit cda5da9b65f78b51cdfe5371f712a0d392dbdb4d
Author: Vicky Papavasileiou <vp...@users.noreply.github.com>
AuthorDate: Thu Sep 22 17:20:37 2022 +0100

    KAFKA-14209: Change Topology optimization to accept list of rules 1/3 (#12641)
    
    This PR is part of a series implementing the self-join rewriting. As part of it, we decided to clean up the TOPOLOGY_OPTIMIZATION_CONFIG and make it a list of optimization rules. Acceptable values are: NO_OPTIMIZATION, OPTIMIZE which applies all optimization rules or a comma separated list of specific optimizations.
    
    Reviewers: Guozhang Wang <gu...@apache.org>, John Roesler <vv...@apache.org>
---
 docs/streams/developer-guide/config-streams.html   | 13 +++--
 .../org/apache/kafka/streams/StreamsBuilder.java   |  6 +--
 .../org/apache/kafka/streams/StreamsConfig.java    | 59 ++++++++++++++++++--
 .../kstream/internals/InternalStreamsBuilder.java  | 39 ++++++++++----
 .../apache/kafka/streams/StreamsConfigTest.java    | 63 ++++++++++++++++++++++
 5 files changed, 157 insertions(+), 23 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index 0aee6b6e1dd..fcc0ea9993f 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -424,8 +424,8 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);</code></pre>
           </tr>
           <tr class="row-even"><td>topology.optimization</td>
             <td>Medium</td>
-            <td colspan="2">A configuration telling Kafka Streams if it should optimize the topology</td>
-            <td>none</td>
+            <td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: (<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge. [...]
+            <td><code> NO_OPTIMIZATION</code></td>
           </tr>
           <tr class="row-odd"><td>upgrade.from</td>
             <td>Medium</td>
@@ -942,8 +942,13 @@ streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
       <blockquote>
         <div>
           <p>
-            You can tell Streams to apply topology optimizations by setting this config. The optimizations are currently all or none and disabled by default.
-            These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. It is recommended to enable this.
+            A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: (<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topi [...]
+          </p>
+          <p>
+            We recommend listing specific optimizations in the config for production code so that the structure of your topology will not change unexpectedly during upgrades of the Streams library.
+          </p>
+          <p>
+            These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. These optimizations will save on network traffic and storage in Kafka without changing the semantics of your applications. Enabling them is recommended.
           </p>
           <p>
             Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index e913728984e..39ae3821450 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -621,11 +621,7 @@ public class StreamsBuilder {
      * @return the {@link Topology} that represents the specified processing logic
      */
     public synchronized Topology build(final Properties props) {
-        final boolean optimizeTopology =
-            props != null &&
-            StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
-
-        internalStreamsBuilder.buildAndOptimizeTopology(optimizeTopology);
+        internalStreamsBuilder.buildAndOptimizeTopology(props);
         return topology;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 325cbec90d9..183f305a12c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -233,6 +235,18 @@ public class StreamsConfig extends AbstractConfig {
     @SuppressWarnings("WeakerAccess")
     public static final String CLIENT_TAG_PREFIX = "client.tag.";
 
+    /** {@code topology.optimization} */
+    private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
+        + " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
+        + "or a comma separated list of specific optimizations: "
+        + "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\").";
+
+    public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
+    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka "
+        + "Streams if it should optimize the topology and what optimizations to apply. "
+        + CONFIG_ERROR_MSG
+        + "\"NO_OPTIMIZATION\" by default.";
+
     /**
      * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for disabling topology optimization
      */
@@ -243,6 +257,22 @@ public class StreamsConfig extends AbstractConfig {
      */
     public static final String OPTIMIZE = "all";
 
+    /**
+     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"}
+     * for enabling the specific optimization that reuses source topic as changelog topic
+     * for KTables.
+     */
+    public static final String REUSE_KTABLE_SOURCE_TOPICS = "reuse.ktable.source.topics";
+
+    /**
+     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"}
+     * for enabling the specific optimization that merges duplicated repartition topics.
+     */
+    public static final String MERGE_REPARTITION_TOPICS = "merge.repartition.topics";
+
+    private static final List<String> TOPOLOGY_OPTIMIZATION_CONFIGS = Arrays.asList(
+        OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS, MERGE_REPARTITION_TOPICS);
+
     /**
      * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
      */
@@ -663,9 +693,6 @@ public class StreamsConfig extends AbstractConfig {
         "For a timeout of 0ms, a task would raise an error for the first internal error. " +
         "For any timeout larger than 0ms, a task will retry at least once before an error is raised.";
 
-    /** {@code topology.optimization} */
-    public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
-    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
 
     /** {@code window.size.ms} */
     public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
@@ -845,7 +872,7 @@ public class StreamsConfig extends AbstractConfig {
             .define(TOPOLOGY_OPTIMIZATION_CONFIG,
                     Type.STRING,
                     NO_OPTIMIZATION,
-                    in(NO_OPTIMIZATION, OPTIMIZE),
+                    (name, value) -> verifyTopologyOptimizationConfigs((String) value),
                     Importance.MEDIUM,
                     TOPOLOGY_OPTIMIZATION_DOC)
 
@@ -1265,6 +1292,7 @@ public class StreamsConfig extends AbstractConfig {
         if (eosEnabled) {
             verifyEOSTransactionTimeoutCompatibility();
         }
+        verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
     }
 
     private void verifyEOSTransactionTimeoutCompatibility() {
@@ -1653,6 +1681,29 @@ public class StreamsConfig extends AbstractConfig {
         return props;
     }
 
+    public static Set<String> verifyTopologyOptimizationConfigs(final String config) {
+        final List<String> configs = Arrays.asList(config.split("\\s*,\\s*"));
+        final Set<String> verifiedConfigs = new HashSet<>();
+        // Verify it doesn't contain none or all plus a list of optimizations
+        if (configs.contains(NO_OPTIMIZATION) || configs.contains(OPTIMIZE)) {
+            if (configs.size() > 1) {
+                throw new ConfigException("\"" + config + "\" is not a valid optimization config. " + CONFIG_ERROR_MSG);
+            }
+        }
+        for (final String conf: configs) {
+            if (!TOPOLOGY_OPTIMIZATION_CONFIGS.contains(conf)) {
+                throw new ConfigException("Unrecognized config. " + CONFIG_ERROR_MSG);
+            }
+        }
+        if (configs.contains(OPTIMIZE)) {
+            verifiedConfigs.add(REUSE_KTABLE_SOURCE_TOPICS);
+            verifiedConfigs.add(MERGE_REPARTITION_TOPICS);
+        } else if (!configs.contains(NO_OPTIMIZATION)) {
+            verifiedConfigs.addAll(configs);
+        }
+        return verifiedConfigs;
+    }
+
     /**
      * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG key Serde
      * class}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index ac9d281be4e..7f6753659a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Properties;
 import java.util.TreeMap;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -270,17 +272,12 @@ public class InternalStreamsBuilder implements InternalNameProvider {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(null);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
-
+    public void buildAndOptimizeTopology(final Properties props) {
         mergeDuplicateSourceNodes();
-        if (optimizeTopology) {
-            LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
-            optimizeKTableSourceTopics();
-            maybeOptimizeRepartitionOperations();
-        }
+        optimizeTopology(props);
 
         final PriorityQueue<GraphNode> graphNodePriorityQueue = new PriorityQueue<>(5, Comparator.comparing(GraphNode::buildPriority));
 
@@ -305,6 +302,28 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         internalTopologyBuilder.validateCopartition();
     }
 
+    /**
+     * A user can provide either the config OPTIMIZE which means all optimizations rules will be
+     * applied or they can provide a list of optimization rules.
+     */
+    private void optimizeTopology(final Properties props) {
+        final Set<String> optimizationConfigs;
+        if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+            optimizationConfigs = Collections.emptySet();
+        } else {
+            optimizationConfigs = StreamsConfig.verifyTopologyOptimizationConfigs(
+                (String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
+        }
+        if (optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
+            LOG.debug("Optimizing the Kafka Streams graph for ktable source nodes");
+            reuseKTableSourceTopics();
+        }
+        if (optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) {
+            LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
+            mergeRepartitionTopics();
+        }
+    }
+
     private void mergeDuplicateSourceNodes() {
         final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();
 
@@ -356,12 +375,12 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         }
     }
 
-    private void optimizeKTableSourceTopics() {
+    private void reuseKTableSourceTopics() {
         LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs ");
         tableSourceNodes.forEach(node -> ((TableSourceNode<?, ?>) node).reuseSourceTopicForChangeLog(true));
     }
 
-    private void maybeOptimizeRepartitionOperations() {
+    private void mergeRepartitionTopics() {
         maybeUpdateKeyChangingRepartitionNodeMap();
         final Iterator<Entry<GraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entryIterator =
             keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 435dd249f2f..001fdd16b07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import java.util.Set;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -1263,6 +1264,68 @@ public class StreamsConfigTest {
         assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {
+        final String value = String.join(",", StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("is not a valid optimization config"));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOffAndSet() {
+        final String value = String.join(",", StreamsConfig.NO_OPTIMIZATION, StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("is not a valid optimization config"));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenOptimizationDoesNotExistInList() {
+        final String value = String.join(",",
+                                         StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
+                                         "topology.optimization.does.not.exist",
+                                         StreamsConfig.MERGE_REPARTITION_TOPICS);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("Unrecognized config."));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationDoesNotExist() {
+        final String value = String.join(",", "topology.optimization.does.not.exist");
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("Unrecognized config."));
+    }
+
+    @Test
+    public void shouldAllowMultipleOptimizations() {
+        final String value = String.join(",",
+                                         StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
+                                         StreamsConfig.MERGE_REPARTITION_TOPICS);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final StreamsConfig config = new StreamsConfig(props);
+        final List<String> configs = Arrays.asList(config.getString(TOPOLOGY_OPTIMIZATION_CONFIG).split(","));
+        assertEquals(2, configs.size());
+        assertTrue(configs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS));
+        assertTrue(configs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS));
+    }
+
+    @Test
+    public void shouldEnableAllOptimizationsWithOptimizeConfig() {
+        final Set<String> configs = StreamsConfig.verifyTopologyOptimizationConfigs(StreamsConfig.OPTIMIZE);
+        assertEquals(2, configs.size());
+        assertTrue(configs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS));
+        assertTrue(configs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS));
+    }
+
+    @Test
+    public void shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() {
+        final Set<String> configs = StreamsConfig.verifyTopologyOptimizationConfigs(StreamsConfig.NO_OPTIMIZATION);
+        assertEquals(0, configs.size());
+    }
+
     static class MisconfiguredSerde implements Serde<Object> {
         @Override
         public void configure(final Map<String, ?>  configs, final boolean isKey) {