You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/07 16:16:37 UTC

[kafka] branch 2.0 updated: KAFKA-6935: Add config for allowing optional optimization (#5071)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 986ba6a  KAFKA-6935: Add config for allowing optional optimization (#5071)
986ba6a is described below

commit 986ba6a691fd1b92717095c68837f2051e4c4a59
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Jun 6 21:11:03 2018 -0400

    KAFKA-6935: Add config for allowing optional optimization (#5071)
    
    Adding configuration to StreamsConfig allowing for making topology optimization optional.
    
    Added unit tests are verifying default values, setting correct value and failure on invalid values.
    
    Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/streams/StreamsConfig.java    | 24 ++++++++++++++++++++--
 .../apache/kafka/streams/StreamsConfigTest.java    | 22 ++++++++++++++++++++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index bc54996..cd9be33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.streams;
 
-import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -32,10 +32,10 @@ import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
-import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
@@ -203,6 +203,16 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ADMIN_CLIENT_PREFIX = "admin.";
 
     /**
+     * Config value for parameter (@link #TOPOLOGY_OPTIMIZATION "topology.optimization" for disabling topology optimization
+     */
+    public static final String NO_OPTIMIZATION = "none";
+
+    /**
+     * Config value for parameter (@link #TOPOLOGY_OPTIMIZATION "topology.optimization" for enabling topology optimization
+     */
+    public static final String OPTIMIZE = "all";
+
+    /**
      * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
      */
     @SuppressWarnings("WeakerAccess")
@@ -434,6 +444,10 @@ public class StreamsConfig extends AbstractConfig {
     public static final String STATE_DIR_CONFIG = "state.dir";
     private static final String STATE_DIR_DOC = "Directory location for state store.";
 
+    /** {@code topology.optimization} */
+    public static final String TOPOLOGY_OPTIMIZATION = "topology.optimization";
+    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
+
     /** {@code upgrade.from} */
     @SuppressWarnings("WeakerAccess")
     public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
@@ -534,6 +548,12 @@ public class StreamsConfig extends AbstractConfig {
                     CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
                     Importance.MEDIUM,
                     CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+            .define(TOPOLOGY_OPTIMIZATION,
+                    Type.STRING,
+                    NO_OPTIMIZATION,
+                    in(NO_OPTIMIZATION, OPTIMIZE),
+                    Importance.MEDIUM,
+                    TOPOLOGY_OPTIMIZATION_DOC)
 
             // LOW
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index e991b6f..cdd4d09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -45,6 +45,7 @@ import java.util.Properties;
 import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
 import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
@@ -583,6 +584,27 @@ public class StreamsConfigTest {
         }
     }
 
+    @Test
+    public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
+        final String expectedOptimizeConfig = "none";
+        final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION);
+        assertEquals("Optimization should be \"none\"", expectedOptimizeConfig, actualOptimizedConifig);
+    }
+
+    @Test
+    public void shouldSpecifyOptimizationWhenNotExplicitlyAddedToConfigs() {
+        final String expectedOptimizeConfig = "all";
+        props.put(TOPOLOGY_OPTIMIZATION, "all");
+        final StreamsConfig config = new StreamsConfig(props);
+        final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION);
+        assertEquals("Optimization should be \"all\"", expectedOptimizeConfig, actualOptimizedConifig);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
+        props.put(TOPOLOGY_OPTIMIZATION, "maybe");
+        new StreamsConfig(props);
+    }
 
     static class MisconfiguredSerde implements Serde {
         @Override

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.