You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/20 18:30:35 UTC
[kafka] branch trunk updated: [KAFKA-3729] Auto-configure
non-default SerDes passed alongside the topology builder (#6461)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e56ebbf [KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (#6461)
e56ebbf is described below
commit e56ebbffca57741d398283e46073ed4170f8f927
Author: Ted Yu <yu...@gmail.com>
AuthorDate: Sat Apr 20 11:30:20 2019 -0700
[KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (#6461)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
docs/streams/developer-guide/datatypes.html | 2 +-
docs/streams/developer-guide/dsl-api.html | 3 ++-
.../org/apache/kafka/streams/KafkaStreams.java | 26 ++++++++++++++++++++++
.../streams/processor/internals/SinkNode.java | 8 +++++++
.../streams/processor/internals/SourceNode.java | 8 +++++++
.../state/internals/MeteredKeyValueStore.java | 19 +++++++++++++---
.../state/internals/MeteredSessionStore.java | 19 +++++++++++++---
.../internals/MeteredTimestampedKeyValueStore.java | 21 ++++++++++++++---
.../internals/MeteredTimestampedWindowStore.java | 21 ++++++++++++++---
.../state/internals/MeteredWindowStore.java | 19 +++++++++++++---
.../state/internals/MeteredKeyValueStoreTest.java | 2 ++
.../state/internals/MeteredSessionStoreTest.java | 2 ++
.../MeteredTimestampedKeyValueStoreTest.java | 2 ++
13 files changed, 135 insertions(+), 17 deletions(-)
diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
index 83159e8..d78202a 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -68,7 +68,7 @@
</div>
<div class="section" id="overriding-default-serdes">
<h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2>
- <p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p>
+ <p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default SerDe settings. For this case, Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index f5c3df9..784dcca 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3547,7 +3547,8 @@ val clicksPerRegion: KTable[String, Long] =
// Write the (continuously updating) results to the output topic.
clicksPerRegion.toStream.to(outputTopic)
</pre>
- <p>A complete example of user-defined SerDes can be found in a test class within the library.</p>
+ <p>A complete example of user-defined SerDes can be found in a test class within the library.
+ Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
</div>
</div>
</div>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 315a6bb..e201bcd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -49,6 +49,8 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.SinkNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -634,6 +636,26 @@ public class KafkaStreams implements AutoCloseable {
this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
}
+ @SuppressWarnings("unchecked")
+ private void configureSerDes(final Set<SinkNode> sinks, final Set<SourceNode> sources) {
+ for (final SinkNode sn : sinks) {
+ if (sn.getKeySerializer() != null) {
+ sn.getKeySerializer().configure(config.originals(), true);
+ }
+ if (sn.getValueSerializer() != null) {
+ sn.getValueSerializer().configure(config.originals(), false);
+ }
+ }
+ for (final SourceNode sn : sources) {
+ if (sn.getKeyDeSerializer() != null) {
+ sn.getKeyDeSerializer().configure(config.originals(), true);
+ }
+ if (sn.getValueDeSerializer() != null) {
+ sn.getValueDeSerializer().configure(config.originals(), false);
+ }
+ }
+ }
+
private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
@@ -670,6 +692,7 @@ public class KafkaStreams implements AutoCloseable {
// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
final ProcessorTopology taskTopology = internalTopologyBuilder.build();
+ configureSerDes(taskTopology.sinks(), taskTopology.sources());
streamsMetadataState = new StreamsMetadataState(
internalTopologyBuilder,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
@@ -683,6 +706,7 @@ public class KafkaStreams implements AutoCloseable {
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
}
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+
final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
(globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
@@ -696,6 +720,8 @@ public class KafkaStreams implements AutoCloseable {
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
GlobalStreamThread.State globalThreadState = null;
if (globalTaskTopology != null) {
+ configureSerDes(globalTaskTopology.sinks(), globalTaskTopology.sources());
+
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
config,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 73bffc8..03e16c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -44,6 +44,14 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
this.partitioner = partitioner;
}
+ public Serializer getKeySerializer() {
+ return keySerializer;
+ }
+
+ public Serializer getValueSerializer() {
+ return valSerializer;
+ }
+
/**
* @throws UnsupportedOperationException if this method adds a child to a sink node
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 87505ca..bcd6475 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -52,6 +52,14 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
this(name, topics, null, keyDeserializer, valDeserializer);
}
+ public Deserializer getKeyDeSerializer() {
+ return keyDeserializer;
+ }
+
+ public Deserializer getValueDeSerializer() {
+ return valDeserializer;
+ }
+
K deserializeKey(final String topic, final Headers headers, final byte[] data) {
return keyDeserializer.deserialize(topic, headers, data);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 51da3ed..277efd2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -115,10 +115,23 @@ public class MeteredKeyValueStore<K, V>
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
+ final Serde<K> usedKeySerde;
+ final Serde<V> usedValueSerde;
+ final Map<String, Object> conf = context.appConfigs();
+ if (keySerde == null) {
+ usedKeySerde = (Serde<K>) context.keySerde();
+ } else {
+ usedKeySerde = keySerde;
+ usedKeySerde.configure(conf, true);
+ }
+ if (valueSerde == null) {
+ usedValueSerde = (Serde<V>) context.valueSerde();
+ } else {
+ usedValueSerde = valueSerde;
+ usedValueSerde.configure(conf, false);
+ }
serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
}
@SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 94b004e..1a55490 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -69,10 +69,23 @@ public class MeteredSessionStore<K, V>
public void init(final ProcessorContext context,
final StateStore root) {
//noinspection unchecked
+ final Serde<K> usedKeySerde;
+ final Serde<V> usedValueSerde;
+ final Map<String, Object> conf = context.appConfigs();
+ if (keySerde == null) {
+ usedKeySerde = (Serde<K>) context.keySerde();
+ } else {
+ usedKeySerde = keySerde;
+ usedKeySerde.configure(conf, true);
+ }
+ if (valueSerde == null) {
+ usedValueSerde = (Serde<V>) context.valueSerde();
+ } else {
+ usedValueSerde = valueSerde;
+ usedValueSerde.configure(conf, false);
+ }
serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 2fa7c96..9a239e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import java.util.Map;
+
/**
* A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its
* inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
@@ -48,9 +50,22 @@ public class MeteredTimestampedKeyValueStore<K, V>
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
+ final Serde<K> usedKeySerde;
+ final Serde<ValueAndTimestamp<V>> usedValueSerde;
+ final Map<String, Object> conf = context.appConfigs();
+ if (keySerde == null) {
+ usedKeySerde = (Serde<K>) context.keySerde();
+ } else {
+ usedKeySerde = keySerde;
+ usedKeySerde.configure(conf, true);
+ }
+ if (valueSerde == null) {
+ usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
+ } else {
+ usedValueSerde = valueSerde;
+ usedValueSerde.configure(conf, false);
+ }
serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index 1c10491..7bc5486 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
+import java.util.Map;
+
/**
* A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
* inner WindowStore implementation do not need to provide its own metrics collecting functionality.
@@ -50,9 +52,22 @@ class MeteredTimestampedWindowStore<K, V>
@SuppressWarnings("unchecked")
@Override
void initStoreSerde(final ProcessorContext context) {
+ final Serde<K> usedKeySerde;
+ final Serde<ValueAndTimestamp<V>> usedValueSerde;
+ final Map<String, Object> conf = context.appConfigs();
+ if (keySerde == null) {
+ usedKeySerde = (Serde<K>) context.keySerde();
+ } else {
+ usedKeySerde = keySerde;
+ usedKeySerde.configure(conf, true);
+ }
+ if (valueSerde == null) {
+ usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
+ } else {
+ usedValueSerde = valueSerde;
+ usedValueSerde.configure(conf, false);
+ }
serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 6d2eaab..74de63e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -99,10 +99,23 @@ public class MeteredWindowStore<K, V>
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
+ final Serde<K> usedKeySerde;
+ final Serde<V> usedValueSerde;
+ final Map<String, Object> conf = context.appConfigs();
+ if (keySerde == null) {
+ usedKeySerde = (Serde<K>) context.keySerde();
+ } else {
+ usedKeySerde = keySerde;
+ usedKeySerde.configure(conf, true);
+ }
+ if (valueSerde == null) {
+ usedValueSerde = (Serde<V>) context.valueSerde();
+ } else {
+ usedValueSerde = valueSerde;
+ usedValueSerde.configure(conf, false);
+ }
serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
}
@SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 5cbe95c..51c14ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
import java.util.Collections;
import java.util.List;
+import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -92,6 +93,7 @@ public class MeteredKeyValueStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
expect(context.taskId()).andReturn(taskId);
+ expect(context.appConfigs()).andReturn(new HashMap<>());
expect(inner.name()).andReturn("metered").anyTimes();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 30c382b..e5eb9e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -91,6 +92,7 @@ public class MeteredSessionStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
expect(context.taskId()).andReturn(taskId);
+ expect(context.appConfigs()).andReturn(new HashMap<>());
expect(inner.name()).andReturn("metered").anyTimes();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 0f60d24..587d369 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
import java.util.Collections;
import java.util.List;
+import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -94,6 +95,7 @@ public class MeteredTimestampedKeyValueStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
expect(context.taskId()).andReturn(taskId);
+ expect(context.appConfigs()).andReturn(new HashMap<>());
expect(inner.name()).andReturn("metered").anyTimes();
}