You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/20 00:41:38 UTC

[kafka] branch trunk updated: KAFKA-10168: fix StreamsConfig parameter name variable (#8865)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 712cc5d  KAFKA-10168: fix StreamsConfig parameter name variable (#8865)
712cc5d is described below

commit 712cc5d073da7595dab836c95372c13fc61047ee
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Jun 19 17:41:06 2020 -0700

    KAFKA-10168: fix StreamsConfig parameter name variable (#8865)
    
    Implements KIP-626.
    
    Reviewers: Boyang Chen <bo...@confluent.io>, John Roesler <jo...@confluent.io>
---
 docs/streams/upgrade-guide.html                           |  8 ++++++++
 .../main/java/org/apache/kafka/streams/StreamsConfig.java | 15 +++++++++++----
 .../java/org/apache/kafka/streams/kstream/KStream.java    |  8 ++++----
 .../streams/kstream/internals/InternalStreamsBuilder.java |  2 +-
 .../java/org/apache/kafka/streams/StreamsBuilderTest.java |  2 +-
 .../java/org/apache/kafka/streams/StreamsConfigTest.java  | 10 +++++-----
 .../integration/KStreamRepartitionIntegrationTest.java    |  2 +-
 .../KTableKTableForeignKeyJoinIntegrationTest.java        |  2 +-
 .../streams/integration/LagFetchIntegrationTest.java      |  2 +-
 .../integration/OptimizedKTableIntegrationTest.java       |  2 +-
 .../kafka/streams/integration/RestoreIntegrationTest.java |  2 +-
 .../integration/StandbyTaskCreationIntegrationTest.java   |  4 ++--
 .../streams/integration/StoreQueryIntegrationTest.java    |  2 +-
 .../kafka/streams/kstream/RepartitionTopicNamingTest.java |  6 +++---
 .../kstream/internals/CogroupedKStreamImplTest.java       |  8 ++++----
 .../streams/kstream/internals/KStreamKStreamJoinTest.java |  4 ++--
 .../streams/kstream/internals/KStreamKTableJoinTest.java  |  4 ++--
 .../streams/kstream/internals/graph/StreamsGraphTest.java | 12 ++++++------
 .../processor/internals/RepartitionOptimizingTest.java    |  2 +-
 .../internals/RepartitionWithMergeOptimizingTest.java     |  2 +-
 .../processor/internals/StreamsPartitionAssignorTest.java |  2 +-
 .../org/apache/kafka/streams/scala/TopologyTest.scala     |  4 ++--
 22 files changed, 60 insertions(+), 45 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index b1af752..be7a95b 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -86,6 +86,14 @@
         More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
     </p>
 
+    <h3><a id="streams_api_changes_270" href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3>
+    <p>
+        The <code>StreamsConfig</code> variable for configuration parameter <code>"topology.optimization"</code>
+        is renamed from <code>TOPOLOGY_OPTIMIZATION</code> to <code>TOPOLOGY_OPTIMIZATION_CONFIG</code>.
+        The old variable is deprecated. Note, that the parameter name itself is not affected.
+        (Cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name">KIP-629</a>.)
+    </p>
+
     <h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
     <p>
         We added a new processing mode that improves application scalability using exactly-once guarantees
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index f9e75fe..af65fd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -207,12 +207,12 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ADMIN_CLIENT_PREFIX = "admin.";
 
     /**
-     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION "topology.optimization"} for disabling topology optimization
+     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for disabling topology optimization
      */
     public static final String NO_OPTIMIZATION = "none";
 
     /**
-     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION "topology.optimization"} for enabling topology optimization
+     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for enabling topology optimization
      */
     public static final String OPTIMIZE = "all";
 
@@ -524,7 +524,7 @@ public class StreamsConfig extends AbstractConfig {
     private static final String STATE_DIR_DOC = "Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem.";
 
     /** {@code topology.optimization} */
-    public static final String TOPOLOGY_OPTIMIZATION = "topology.optimization";
+    public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
     private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
 
     /** {@code upgrade.from} */
@@ -552,6 +552,13 @@ public class StreamsConfig extends AbstractConfig {
     private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>org.apache.kafka.streams.processor.PartitionGrouper</code> interface." +
         " WARNING: This config is deprecated and will be removed in 3.0.0 release.";
 
+    /**
+     * {@code topology.optimization}
+     * @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead
+     */
+    @Deprecated
+    public static final String TOPOLOGY_OPTIMIZATION = TOPOLOGY_OPTIMIZATION_CONFIG;
+
 
     private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
         new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG};
@@ -664,7 +671,7 @@ public class StreamsConfig extends AbstractConfig {
                     CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
                     Importance.MEDIUM,
                     CommonClientConfigs.SECURITY_PROTOCOL_DOC)
-            .define(TOPOLOGY_OPTIMIZATION,
+            .define(TOPOLOGY_OPTIMIZATION_CONFIG,
                     Type.STRING,
                     NO_OPTIMIZATION,
                     in(NO_OPTIMIZATION, OPTIMIZE),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index bcc911c..f5f02dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -933,7 +933,7 @@ public interface KStream<K, V> {
      * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
      * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
      * correctly on its key.
-     * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
+     * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
      * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
      * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
      * <p>
@@ -959,7 +959,7 @@ public interface KStream<K, V> {
      * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
      * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
      * correctly on its key.
-     * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
+     * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
      * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
      * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
      * <p>
@@ -986,7 +986,7 @@ public interface KStream<K, V> {
      * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
      * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
      * correctly on its key.
-     * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
+     * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
      * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
      * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
      * <p>
@@ -1014,7 +1014,7 @@ public interface KStream<K, V> {
      * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
      * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
      * correctly on its key.
-     * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
+     * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
      * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
      * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
      * <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index e6aa2af..f859db8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -317,7 +317,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
 
     private void maybePerformOptimizations(final Properties props) {
 
-        if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION))) {
+        if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG))) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index f77b3e5..bdc60a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -397,7 +397,7 @@ public class StreamsBuilderTest {
         final String topic = "topic";
         builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
         final Properties props = StreamsTestUtils.getStreamsConfig();
-        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
         final Topology topology = builder.build(props);
 
         final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 92d2881..1745cc7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -47,7 +47,7 @@ import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
 import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
-import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
+import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
@@ -895,22 +895,22 @@ public class StreamsConfigTest {
     @Test
     public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
         final String expectedOptimizeConfig = "none";
-        final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION);
+        final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
         assertEquals("Optimization should be \"none\"", expectedOptimizeConfig, actualOptimizedConifig);
     }
 
     @Test
     public void shouldSpecifyOptimizationWhenNotExplicitlyAddedToConfigs() {
         final String expectedOptimizeConfig = "all";
-        props.put(TOPOLOGY_OPTIMIZATION, "all");
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "all");
         final StreamsConfig config = new StreamsConfig(props);
-        final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION);
+        final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
         assertEquals("Optimization should be \"all\"", expectedOptimizeConfig, actualOptimizedConifig);
     }
 
     @Test(expected = ConfigException.class)
     public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
-        props.put(TOPOLOGY_OPTIMIZATION, "maybe");
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "maybe");
         new StreamsConfig(props);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 03410e0..a12bb85 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -131,7 +131,7 @@ public class KStreamRepartitionIntegrationTest {
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, topologyOptimization);
+        streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization);
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 5ebd9ef..d774fa6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -95,7 +95,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
-            mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization)
+            mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization)
         ));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index 2e6d6c0..d288c3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -156,7 +156,7 @@ public class LagFetchIntegrationTest {
             props.put(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, FallbackPriorTaskAssignor.class.getName());
             props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + i);
             props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i);
-            props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization);
+            props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization);
             props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
             props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(stateStoreName + i).getAbsolutePath());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index fbf40e2..2507436 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -180,7 +180,7 @@ public class OptimizedKTableIntegrationTest {
     private Properties streamsConfiguration() {
         final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties config = new Properties();
-        config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index faac172..0dad8c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -135,7 +135,7 @@ public class RestoreIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final Properties props = props();
-        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
 
         // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions
         final int offsetLimitDelta = 1000;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index 3bf2d8f..79f14b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -127,9 +127,9 @@ public class StandbyTaskCreationIntegrationTest {
     @Test
     public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws Exception {
         final Properties streamsConfiguration1 = streamsConfiguration();
-        streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
         final Properties streamsConfiguration2 = streamsConfiguration();
-        streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("source-table"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index fb73101..ef34def 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -389,7 +389,7 @@ public class StoreQueryIntegrationTest {
     private Properties streamsConfiguration() {
         final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties config = new Properties();
-        config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 5638dbd..c286b14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -192,7 +192,7 @@ public class RepartitionTopicNamingTest {
         kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
         kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
         final Properties properties = new Properties();
-        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
         final Topology topology = builder.build(properties);
         assertThat(getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern), is(1));
     }
@@ -224,7 +224,7 @@ public class RepartitionTopicNamingTest {
     public void shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties properties = new Properties();
-        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
         final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
                                                                      .selectKey((k, v) -> k)
                                                                      .groupByKey(Grouped.as("grouping"));
@@ -500,7 +500,7 @@ public class RepartitionTopicNamingTest {
 
         final Properties properties = new Properties();
 
-        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizationConfig);
         return builder.build(properties);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
index d6195be..d780fa7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
@@ -379,7 +379,7 @@ public class CogroupedKStreamImplTest {
     @Test
     public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInSameCogroupsWithOptimization() {
         final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
@@ -528,7 +528,7 @@ public class CogroupedKStreamImplTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
 
         final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
         final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
@@ -653,7 +653,7 @@ public class CogroupedKStreamImplTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
 
         final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
         final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
@@ -707,7 +707,7 @@ public class CogroupedKStreamImplTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
 
         final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
         final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 56190ee..4dd7f70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -84,7 +84,7 @@ public class KStreamKStreamJoinTest {
     public void shouldReuseRepartitionTopicWithGeneratedName() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
-        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
         final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
         final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
         final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
@@ -98,7 +98,7 @@ public class KStreamKStreamJoinTest {
     public void shouldCreateRepartitionTopicsWithUserProvidedName() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
-        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
         final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
         final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
         final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index a5368cd..c506779 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -118,7 +118,7 @@ public class KStreamKTableJoinTest {
     public void shouldReuseRepartitionTopicWithGeneratedName() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
-        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
         final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
@@ -133,7 +133,7 @@ public class KStreamKTableJoinTest {
     public void shouldCreateRepartitionTopicsWithUserProvidedName() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties props = new Properties();
-        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
         final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
         final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index d8e8303..9601602 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -89,7 +89,7 @@ public class StreamsGraphTest {
         final Properties properties = new Properties();
         properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
         properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> inputStream = builder.stream("inputTopic");
@@ -118,7 +118,7 @@ public class StreamsGraphTest {
         final Properties properties = new Properties();
         properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
         properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
 
         final StreamsBuilder builder = new StreamsBuilder();
         initializer = () -> "";
@@ -222,7 +222,7 @@ public class StreamsGraphTest {
             .to("output_topic");
 
         final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
         final Topology topology = streamsBuilder.build(properties);
 
         assertEquals(expectedMergeOptimizedTopology, topology.describe().toString());
@@ -242,7 +242,7 @@ public class StreamsGraphTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties properties = new Properties();
-        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig);
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig);
 
         final KStream<String, String> inputStream = builder.stream("input");
         final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v);
@@ -259,7 +259,7 @@ public class StreamsGraphTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties properties = new Properties();
-        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig);
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig);
 
         final KStream<String, String> inputStream = builder.stream("input");
         final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v).through("through-topic");
@@ -274,7 +274,7 @@ public class StreamsGraphTest {
     private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) {
         final StreamsBuilder builder = new StreamsBuilder();
         final Properties properties = new Properties();
-        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig);
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig);
 
         final KStream<String, String> inputStream = builder.<String, String>stream("input").selectKey((k, v) -> k + v);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index 3dcb2a3..6811198 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -200,7 +200,7 @@ public class RepartitionOptimizingTest {
                                                     .withOtherValueSerde(Serdes.Long()))
             .to(JOINED_TOPIC, Produced.as("join-to"));
 
-        streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
+        streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizationConfig);
         final Topology topology = builder.build(streamsConfiguration);
 
         topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
index bd2656c..b50ef8e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
@@ -120,7 +120,7 @@ public class RepartitionWithMergeOptimizingTest {
 
     private void runTest(final String optimizationConfig, final int expectedNumberRepartitionTopics) {
 
-        streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
+        streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizationConfig);
 
         final StreamsBuilder builder = new StreamsBuilder();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index b3d4169..a2c2b46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -1941,7 +1941,7 @@ public class StreamsPartitionAssignorTest {
 
         createDefaultMockTaskManager();
         EasyMock.expect(taskManager.mainConsumer()).andStubReturn(consumerClient);
-        configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE));
+        configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE));
         overwriteInternalTopicManagerWithMock(false);
 
         EasyMock.expect(consumerClient.committed(changelogs))
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 3107db6..3cf0059 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -335,10 +335,10 @@ class TopologyTest {
   def shouldBuildIdenticalTopologyInJavaNScalaProperties(): Unit = {
 
     val props = new Properties()
-    props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE)
+    props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE)
 
     val propsNoOptimization = new Properties()
-    propsNoOptimization.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION)
+    propsNoOptimization.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION)
 
     val AGGREGATION_TOPIC = "aggregationTopic"
     val REDUCE_TOPIC = "reduceTopic"