You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/02 21:14:55 UTC
kafka git commit: KAFKA-3929: Add prefix for underlying clients
configs in StreamConfig
Repository: kafka
Updated Branches:
refs/heads/trunk 3bb38d37b -> bb629f224
KAFKA-3929: Add prefix for underlying clients configs in StreamConfig
Add prefixes for consumer and producer configs to StreamsConfig, but be backward compatible.
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes #1649 from dguy/kafka-3929
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb629f22
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb629f22
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb629f22
Branch: refs/heads/trunk
Commit: bb629f2243c4462db2a863793c190d734f11f3c6
Parents: 3bb38d3
Author: Damian Guy <da...@gmail.com>
Authored: Tue Aug 2 14:14:52 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Aug 2 14:14:52 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 83 ++++++++++++++++++--
.../apache/kafka/streams/StreamsConfigTest.java | 64 +++++++++++++++
2 files changed, 139 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bb629f22/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index a68de4f..b624e0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -47,6 +47,12 @@ public class StreamsConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
+ // Prefix used to isolate consumer configs from producer configs.
+ public static final String CONSUMER_PREFIX = "consumer.";
+
+ // Prefix used to isolate producer configs from consumer configs.
+ public static final String PRODUCER_PREFIX = "producer.";
+
/** <code>state.dir</code> */
public static final String STATE_DIR_CONFIG = "state.dir";
private static final String STATE_DIR_DOC = "Directory location for state store.";
@@ -122,6 +128,7 @@ public class StreamsConfig extends AbstractConfig {
public static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface";
+
static {
CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, // required with no default value
Type.STRING,
@@ -251,23 +258,55 @@ public class StreamsConfig extends AbstractConfig {
public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
}
+ /**
+ * Prefix a property with {@link StreamsConfig#CONSUMER_PREFIX}. This is used to isolate consumer configs
+ * from producer configs
+ * @param consumerProp
+ * @return CONSUMER_PREFIX + consumerProp
+ */
+ public static String consumerPrefix(final String consumerProp) {
+ return CONSUMER_PREFIX + consumerProp;
+ }
+
+ /**
+ * Prefix a property with {@link StreamsConfig#PRODUCER_PREFIX}. This is used to isolate producer configs
+ * from consumer configs
+ * @param producerProp
+ * @return PRODUCER_PREFIX + consumerProp
+ */
+ public static String producerPrefix(final String producerProp) {
+ return PRODUCER_PREFIX + producerProp;
+ }
+
public StreamsConfig(Map<?, ?> props) {
super(CONFIG, props);
}
+ /**
+ * Get the configs specific to the Consumer. Properties using the prefix {@link StreamsConfig#CONSUMER_PREFIX}
+ * will be used in favor over their non-prefixed versions except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG}
+ * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster
+ * @param streamThread the {@link StreamThread} creating a consumer
+ * @param groupId consumer groupId
+ * @param clientId clientId
+ * @return Map of the Consumer configuration.
+ * @throws ConfigException
+ */
public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException {
- Map<String, Object> originals = this.originals();
+ final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX);
// disable auto commit and throw exception if there is user overridden values,
// this is necessary for streams commit semantics
- if (originals.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+ if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ ", as the streams client will always turn off auto committing.");
}
// generate consumer configs from original properties and overridden maps
- Map<String, Object> props = clientProps(ConsumerConfig.configNames(), originals, CONSUMER_DEFAULT_OVERRIDES);
+ Map<String, Object> props = clientProps(ConsumerConfig.configNames(), consumerProps, CONSUMER_DEFAULT_OVERRIDES);
+ // bootstrap.servers should be from StreamsConfig
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix, and group id
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
@@ -283,18 +322,30 @@ public class StreamsConfig extends AbstractConfig {
return props;
}
+
+ /**
+ * Get the consumer config for the restore-consumer. Properties using the prefix {@link StreamsConfig#CONSUMER_PREFIX}
+ * will be used in favor over their non-prefixed versions except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG}
+ * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster
+ * @param clientId clientId
+ * @return Map of the Consumer configuration
+ * @throws ConfigException
+ */
public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException {
- Map<String, Object> originals = this.originals();
+ Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX);
// disable auto commit and throw exception if there is user overridden values,
// this is necessary for streams commit semantics
- if (originals.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+ if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ ", as the streams client will always turn off auto committing.");
}
// generate consumer configs from original properties and overridden maps
- Map<String, Object> props = clientProps(ConsumerConfig.configNames(), originals, CONSUMER_DEFAULT_OVERRIDES);
+ Map<String, Object> props = clientProps(ConsumerConfig.configNames(), consumerProps, CONSUMER_DEFAULT_OVERRIDES);
+
+ // bootstrap.servers should be from StreamsConfig
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
// no need to set group id for a restore consumer
props.remove(ConsumerConfig.GROUP_ID_CONFIG);
@@ -305,16 +356,32 @@ public class StreamsConfig extends AbstractConfig {
return props;
}
+
+ /**
+ * Get the configs for the Producer. Properties using the prefix {@link StreamsConfig#PRODUCER_PREFIX}
+ * will be used in favor over their non-prefixed versions except in the case of {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG}
+ * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster
+ * @param clientId clientId
+ * @return Map of the Consumer configuration
+ * @throws ConfigException
+ */
public Map<String, Object> getProducerConfigs(String clientId) {
// generate producer configs from original properties and overridden maps
- Map<String, Object> props = clientProps(ProducerConfig.configNames(), this.originals(), PRODUCER_DEFAULT_OVERRIDES);
-
+ Map<String, Object> props = clientProps(ProducerConfig.configNames(), getClientPropsWithPrefix(PRODUCER_PREFIX), PRODUCER_DEFAULT_OVERRIDES);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
return props;
}
+ private Map<String, Object> getClientPropsWithPrefix(final String prefix) {
+ // To be backward compatible we first get all the originals.
+ final Map<String, Object> props = this.originals();
+ props.putAll(this.originalsWithPrefix(prefix));
+ return props;
+ }
+
public Serde keySerde() {
Serde<?> serde = getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class);
serde.configure(originals(), true);
http://git-wip-us.apache.org/repos/asf/kafka/blob/bb629f22/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3d4a9cc..30306f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -31,6 +31,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
+import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -108,4 +110,66 @@ public class StreamsConfigTest {
assertEquals(expectedBootstrapServers, actualBootstrapServers);
}
+ @Test
+ public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+ props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+ props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+ assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+ }
+
+ @Test
+ public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
+ props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+ props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+ assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+ }
+
+
+ @Test
+ public void shouldSupportPrefixedProducerConfigs() throws Exception {
+ props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
+ props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> configs = streamsConfig.getProducerConfigs("client");
+ assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
+ assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
+ }
+
+ @Test
+ public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+ assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+ }
+
+ @Test
+ public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception {
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("groupId");
+ assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+ }
+
+ @Test
+ public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
+ props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> configs = streamsConfig.getProducerConfigs("client");
+ assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
+ assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
+ }
+
+
}