You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/20 18:30:35 UTC

[kafka] branch trunk updated: [KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (#6461)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e56ebbf  [KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (#6461)
e56ebbf is described below

commit e56ebbffca57741d398283e46073ed4170f8f927
Author: Ted Yu <yu...@gmail.com>
AuthorDate: Sat Apr 20 11:30:20 2019 -0700

    [KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (#6461)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 docs/streams/developer-guide/datatypes.html        |  2 +-
 docs/streams/developer-guide/dsl-api.html          |  3 ++-
 .../org/apache/kafka/streams/KafkaStreams.java     | 26 ++++++++++++++++++++++
 .../streams/processor/internals/SinkNode.java      |  8 +++++++
 .../streams/processor/internals/SourceNode.java    |  8 +++++++
 .../state/internals/MeteredKeyValueStore.java      | 19 +++++++++++++---
 .../state/internals/MeteredSessionStore.java       | 19 +++++++++++++---
 .../internals/MeteredTimestampedKeyValueStore.java | 21 ++++++++++++++---
 .../internals/MeteredTimestampedWindowStore.java   | 21 ++++++++++++++---
 .../state/internals/MeteredWindowStore.java        | 19 +++++++++++++---
 .../state/internals/MeteredKeyValueStoreTest.java  |  2 ++
 .../state/internals/MeteredSessionStoreTest.java   |  2 ++
 .../MeteredTimestampedKeyValueStoreTest.java       |  2 ++
 13 files changed, 135 insertions(+), 17 deletions(-)

diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
index 83159e8..d78202a 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -68,7 +68,7 @@
     </div>
     <div class="section" id="overriding-default-serdes">
       <h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2>
-      <p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p>
+      <p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default SerDe settings. For this case, Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
       <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
 <span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
 
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index f5c3df9..784dcca 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3547,7 +3547,8 @@ val clicksPerRegion: KTable[String, Long] =
 // Write the (continuously updating) results to the output topic.
 clicksPerRegion.toStream.to(outputTopic)
               </pre>
-              <p>A complete example of user-defined SerDes can be found in a test class within the library.</p>
+              <p>A complete example of user-defined SerDes can be found in a test class within the library.
+   Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
             </div>
         </div>
 </div>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 315a6bb..e201bcd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -49,6 +49,8 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.SinkNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -634,6 +636,26 @@ public class KafkaStreams implements AutoCloseable {
         this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
     }
 
+    @SuppressWarnings("unchecked")
+    private void configureSerDes(final Set<SinkNode> sinks, final Set<SourceNode> sources) {
+        for (final SinkNode sn : sinks) {
+            if (sn.getKeySerializer() != null) {
+                sn.getKeySerializer().configure(config.originals(), true);
+            }
+            if (sn.getValueSerializer() != null) {
+                sn.getValueSerializer().configure(config.originals(), false);
+            }
+        }
+        for (final SourceNode sn : sources) {
+            if (sn.getKeyDeSerializer() != null) {
+                sn.getKeyDeSerializer().configure(config.originals(), true);
+            }
+            if (sn.getValueDeSerializer() != null) {
+                sn.getValueDeSerializer().configure(config.originals(), false);
+            }
+        }
+    }
+
     private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                          final StreamsConfig config,
                          final KafkaClientSupplier clientSupplier,
@@ -670,6 +692,7 @@ public class KafkaStreams implements AutoCloseable {
         // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
         final ProcessorTopology taskTopology = internalTopologyBuilder.build();
 
+        configureSerDes(taskTopology.sinks(), taskTopology.sources());
         streamsMetadataState = new StreamsMetadataState(
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
@@ -683,6 +706,7 @@ public class KafkaStreams implements AutoCloseable {
             log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
         }
         final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+
         final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
         final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
                 (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
@@ -696,6 +720,8 @@ public class KafkaStreams implements AutoCloseable {
         final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
         GlobalStreamThread.State globalThreadState = null;
         if (globalTaskTopology != null) {
+            configureSerDes(globalTaskTopology.sinks(), globalTaskTopology.sources());
+
             final String globalThreadId = clientId + "-GlobalStreamThread";
             globalStreamThread = new GlobalStreamThread(globalTaskTopology,
                                                         config,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 73bffc8..03e16c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -44,6 +44,14 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         this.partitioner = partitioner;
     }
 
+    public Serializer getKeySerializer() {
+        return keySerializer;
+    }
+
+    public Serializer getValueSerializer() {
+        return valSerializer;
+    }
+
     /**
      * @throws UnsupportedOperationException if this method adds a child to a sink node
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 87505ca..bcd6475 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -52,6 +52,14 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         this(name, topics, null, keyDeserializer, valDeserializer);
     }
 
+    public Deserializer getKeyDeSerializer() {
+        return keyDeserializer;
+    }
+
+    public Deserializer getValueDeSerializer() {
+        return valDeserializer;
+    }
+
     K deserializeKey(final String topic, final Headers headers, final byte[] data) {
         return keyDeserializer.deserialize(topic, headers, data);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 51da3ed..277efd2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -115,10 +115,23 @@ public class MeteredKeyValueStore<K, V>
 
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
+        final Serde<K> usedKeySerde;
+        final Serde<V> usedValueSerde;
+        final Map<String, Object> conf = context.appConfigs();
+        if (keySerde == null) {
+            usedKeySerde = (Serde<K>) context.keySerde();
+        } else {
+            usedKeySerde = keySerde;
+            usedKeySerde.configure(conf, true);
+        }
+        if (valueSerde == null) {
+            usedValueSerde = (Serde<V>) context.valueSerde();
+        } else {
+            usedValueSerde = valueSerde;
+            usedValueSerde.configure(conf, false);
+        }
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 94b004e..1a55490 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -69,10 +69,23 @@ public class MeteredSessionStore<K, V>
     public void init(final ProcessorContext context,
                      final StateStore root) {
         //noinspection unchecked
+        final Serde<K> usedKeySerde;
+        final Serde<V> usedValueSerde;
+        final Map<String, Object> conf = context.appConfigs();
+        if (keySerde == null) {
+            usedKeySerde = (Serde<K>) context.keySerde();
+        } else {
+            usedKeySerde = keySerde;
+            usedKeySerde.configure(conf, true);
+        }
+        if (valueSerde == null) {
+            usedValueSerde = (Serde<V>) context.valueSerde();
+        } else {
+            usedValueSerde = valueSerde;
+            usedValueSerde.configure(conf, false);
+        }
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
         metrics = (StreamsMetricsImpl) context.metrics();
 
         taskName = context.taskId().toString();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 2fa7c96..9a239e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
+import java.util.Map;
+
 /**
  * A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its
  * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
@@ -48,9 +50,22 @@ public class MeteredTimestampedKeyValueStore<K, V>
 
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
+        final Serde<K> usedKeySerde;
+        final Serde<ValueAndTimestamp<V>> usedValueSerde;
+        final Map<String, Object> conf = context.appConfigs();
+        if (keySerde == null) {
+            usedKeySerde = (Serde<K>) context.keySerde();
+        } else {
+            usedKeySerde = keySerde;
+            usedKeySerde.configure(conf, true);
+        }
+        if (valueSerde == null) {
+            usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
+        } else {
+            usedValueSerde = valueSerde;
+            usedValueSerde.configure(conf, false);
+        }
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index 1c10491..7bc5486 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 
+import java.util.Map;
+
 /**
  * A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
  * inner WindowStore implementation do not need to provide its own metrics collecting functionality.
@@ -50,9 +52,22 @@ class MeteredTimestampedWindowStore<K, V>
     @SuppressWarnings("unchecked")
     @Override
     void initStoreSerde(final ProcessorContext context) {
+        final Serde<K> usedKeySerde;
+        final Serde<ValueAndTimestamp<V>> usedValueSerde;
+        final Map<String, Object> conf = context.appConfigs();
+        if (keySerde == null) {
+            usedKeySerde = (Serde<K>) context.keySerde();
+        } else {
+            usedKeySerde = keySerde;
+            usedKeySerde.configure(conf, true);
+        }
+        if (valueSerde == null) {
+            usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
+        } else {
+            usedValueSerde = valueSerde;
+            usedValueSerde.configure(conf, false);
+        }
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 6d2eaab..74de63e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -99,10 +99,23 @@ public class MeteredWindowStore<K, V>
 
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
+        final Serde<K> usedKeySerde;
+        final Serde<V> usedValueSerde;
+        final Map<String, Object> conf = context.appConfigs();
+        if (keySerde == null) {
+            usedKeySerde = (Serde<K>) context.keySerde();
+        } else {
+            usedKeySerde = keySerde;
+            usedKeySerde.configure(conf, true);
+        }
+        if (valueSerde == null) {
+            usedValueSerde = (Serde<V>) context.valueSerde();
+        } else {
+            usedValueSerde = valueSerde;
+            usedValueSerde.configure(conf, false);
+        }
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 5cbe95c..51c14ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -92,6 +93,7 @@ public class MeteredKeyValueStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
         expect(context.taskId()).andReturn(taskId);
+        expect(context.appConfigs()).andReturn(new HashMap<>());
         expect(inner.name()).andReturn("metered").anyTimes();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 30c382b..e5eb9e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -91,6 +92,7 @@ public class MeteredSessionStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
         expect(context.taskId()).andReturn(taskId);
+        expect(context.appConfigs()).andReturn(new HashMap<>());
         expect(inner.name()).andReturn("metered").anyTimes();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 0f60d24..587d369 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -94,6 +95,7 @@ public class MeteredTimestampedKeyValueStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
         expect(context.taskId()).andReturn(taskId);
+        expect(context.appConfigs()).andReturn(new HashMap<>());
         expect(inner.name()).andReturn("metered").anyTimes();
     }