You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/07 16:16:37 UTC
[kafka] branch 2.0 updated: KAFKA-6935: Add config for allowing
optional optimization (#5071)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 986ba6a KAFKA-6935: Add config for allowing optional optimization (#5071)
986ba6a is described below
commit 986ba6a691fd1b92717095c68837f2051e4c4a59
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Jun 6 21:11:03 2018 -0400
KAFKA-6935: Add config for allowing optional optimization (#5071)
Adding configuration to StreamsConfig allowing for making topology optimization optional.
Added unit tests are verifying default values, setting correct value and failure on invalid values.
Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../org/apache/kafka/streams/StreamsConfig.java | 24 ++++++++++++++++++++--
.../apache/kafka/streams/StreamsConfigTest.java | 22 ++++++++++++++++++++
2 files changed, 44 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index bc54996..cd9be33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -16,8 +16,8 @@
*/
package org.apache.kafka.streams;
-import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -32,10 +32,10 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
-import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
@@ -203,6 +203,16 @@ public class StreamsConfig extends AbstractConfig {
public static final String ADMIN_CLIENT_PREFIX = "admin.";
/**
+ * Config value for parameter (@link #TOPOLOGY_OPTIMIZATION "topology.optimization" for disabling topology optimization
+ */
+ public static final String NO_OPTIMIZATION = "none";
+
+ /**
+ * Config value for parameter (@link #TOPOLOGY_OPTIMIZATION "topology.optimization" for enabling topology optimization
+ */
+ public static final String OPTIMIZE = "all";
+
+ /**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
*/
@SuppressWarnings("WeakerAccess")
@@ -434,6 +444,10 @@ public class StreamsConfig extends AbstractConfig {
public static final String STATE_DIR_CONFIG = "state.dir";
private static final String STATE_DIR_DOC = "Directory location for state store.";
+ /** {@code topology.optimization} */
+ public static final String TOPOLOGY_OPTIMIZATION = "topology.optimization";
+ private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
+
/** {@code upgrade.from} */
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
@@ -534,6 +548,12 @@ public class StreamsConfig extends AbstractConfig {
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .define(TOPOLOGY_OPTIMIZATION,
+ Type.STRING,
+ NO_OPTIMIZATION,
+ in(NO_OPTIMIZATION, OPTIMIZE),
+ Importance.MEDIUM,
+ TOPOLOGY_OPTIMIZATION_DOC)
// LOW
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index e991b6f..cdd4d09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -45,6 +45,7 @@ import java.util.Properties;
import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
@@ -583,6 +584,27 @@ public class StreamsConfigTest {
}
}
+ @Test
+ public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
+ final String expectedOptimizeConfig = "none";
+ final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION);
+ assertEquals("Optimization should be \"none\"", expectedOptimizeConfig, actualOptimizedConifig);
+ }
+
+ @Test
+ public void shouldSpecifyOptimizationWhenNotExplicitlyAddedToConfigs() {
+ final String expectedOptimizeConfig = "all";
+ props.put(TOPOLOGY_OPTIMIZATION, "all");
+ final StreamsConfig config = new StreamsConfig(props);
+ final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION);
+ assertEquals("Optimization should be \"all\"", expectedOptimizeConfig, actualOptimizedConifig);
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
+ props.put(TOPOLOGY_OPTIMIZATION, "maybe");
+ new StreamsConfig(props);
+ }
static class MisconfiguredSerde implements Serde {
@Override
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.