You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/30 18:18:57 UTC
[kafka] branch trunk updated: KAFKA-6166: Streams configuration
requires consumer. and producer. in order to be read (#4434)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cb93d76 KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read (#4434)
cb93d76 is described below
commit cb93d764613d801a1185989f09ce2d6b76009020
Author: Filipe Agapito <fi...@gmail.com>
AuthorDate: Tue Jan 30 18:18:51 2018 +0000
KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read (#4434)
* Implement method to get custom properties
* Add custom properties to getConsumerConfigs and getProducerConfigs
* Add tests
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../org/apache/kafka/streams/StreamsConfig.java | 28 ++++++++++++++++-
.../apache/kafka/streams/StreamsConfigTest.java | 36 +++++++++++++++++++---
2 files changed, 59 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0feb48d..1393223 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -693,6 +693,7 @@ public class StreamsConfig extends AbstractConfig {
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+ consumerProps.putAll(getClientCustomProps());
consumerProps.putAll(clientProvidedProps);
// bootstrap.servers should be from StreamsConfig
@@ -832,6 +833,7 @@ public class StreamsConfig extends AbstractConfig {
// generate producer configs from original properties and overridden maps
final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
+ props.putAll(getClientCustomProps());
props.putAll(clientProvidedProps);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -847,7 +849,11 @@ public class StreamsConfig extends AbstractConfig {
* @return Map of the admin client configuration.
*/
public Map<String, Object> getAdminConfigs(final String clientId) {
- final Map<String, Object> props = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
+ final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
+
+ final Map<String, Object> props = new HashMap<>();
+ props.putAll(getClientCustomProps());
+ props.putAll(clientProvidedProps);
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-admin");
@@ -863,6 +869,26 @@ public class StreamsConfig extends AbstractConfig {
}
/**
+ * Get a map of custom configs by removing from the originals all the Streams, Consumer, Producer, and AdminClient configs.
+ * Prefixed properties are also removed because they are already added by {@link #getClientPropsWithPrefix(String, Set)}.
+ * This allows to set a custom property for a specific client alone if specified using a prefix, or for all
+ * when no prefix is used.
+ *
+ * @return a map with the custom properties
+ */
+ private Map<String, Object> getClientCustomProps() {
+ final Map<String, Object> props = originals();
+ props.keySet().removeAll(CONFIG.names());
+ props.keySet().removeAll(ConsumerConfig.configNames());
+ props.keySet().removeAll(ProducerConfig.configNames());
+ props.keySet().removeAll(AdminClientConfig.configNames());
+ props.keySet().removeAll(originalsWithPrefix(CONSUMER_PREFIX, false).keySet());
+ props.keySet().removeAll(originalsWithPrefix(PRODUCER_PREFIX, false).keySet());
+ props.keySet().removeAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX, false).keySet());
+ return props;
+ }
+
+ /**
* Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG key Serde
* class}. This method is deprecated. Use {@link #defaultKeySerde()} method instead.
*
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 1d6b5a5..cc072d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -45,6 +45,7 @@ import java.util.Properties;
import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
@@ -66,7 +67,6 @@ public class StreamsConfigTest {
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put("DUMMY", "dummy");
props.put("key.deserializer.encoding", "UTF8");
props.put("value.deserializer.encoding", "UTF-16");
streamsConfig = new StreamsConfig(props);
@@ -90,7 +90,6 @@ public class StreamsConfigTest {
final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer");
assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
- assertNull(returnedProps.get("DUMMY"));
}
@Test
@@ -101,7 +100,6 @@ public class StreamsConfigTest {
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId);
assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
- assertNull(returnedProps.get("DUMMY"));
}
@Test
@@ -148,7 +146,6 @@ public class StreamsConfigTest {
final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer");
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
- assertNull(returnedProps.get("DUMMY"));
}
@Test
@@ -265,6 +262,37 @@ public class StreamsConfigTest {
}
@Test
+ public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ props.put("custom.property.host", "host");
+ final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+ final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+ final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
+ assertEquals("host", consumerConfigs.get("custom.property.host"));
+ assertEquals("host", restoreConsumerConfigs.get("custom.property.host"));
+ assertEquals("host", producerConfigs.get("custom.property.host"));
+ assertEquals("host", adminConfigs.get("custom.property.host"));
+ }
+
+ @Test
+ public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() {
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ props.put("custom.property.host", "host0");
+ props.put(consumerPrefix("custom.property.host"), "host1");
+ props.put(producerPrefix("custom.property.host"), "host2");
+ props.put(adminClientPrefix("custom.property.host"), "host3");
+ final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+ final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+ final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
+ assertEquals("host1", consumerConfigs.get("custom.property.host"));
+ assertEquals("host1", restoreConsumerConfigs.get("custom.property.host"));
+ assertEquals("host2", producerConfigs.get("custom.property.host"));
+ assertEquals("host3", adminConfigs.get("custom.property.host"));
+ }
+
+ @Test
public void shouldSupportNonPrefixedAdminConfigs() {
props.put(AdminClientConfig.RETRIES_CONFIG, 10);
final StreamsConfig streamsConfig = new StreamsConfig(props);
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.