You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/29 21:08:43 UTC

[kafka] branch trunk updated: Cleanup KTableImpl#doTransformValues (#6519)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d10023e  Cleanup KTableImpl#doTransformValues (#6519)
d10023e is described below

commit d10023e8d3093eaa6b0b3afa04c7ea1b9bee6c30
Author: Lee Dongjin <do...@apache.org>
AuthorDate: Sat Mar 30 06:08:20 2019 +0900

    Cleanup KTableImpl#doTransformValues (#6519)
    
    This PR is a follow-up of #6174 and #6453, which cleans up KTableImpl#doTransformValues method.
    
    Reviewers: Bill Bejeck <bb...@gmail.com>
---
 .../streams/kstream/internals/KTableImpl.java      | 31 +++++++++++++++-------
 1 file changed, 22 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 67b3c31..e9291cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -295,12 +295,28 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                                                  final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
                                                  final String... stateStoreNames) {
         Objects.requireNonNull(stateStoreNames, "stateStoreNames");
+        final Serde<K> keySerde;
+        final Serde<VR> valueSerde;
+        final String queryableStoreName;
+        final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
 
-        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+        if (materializedInternal != null) {
+            // don't inherit parent value serde, since this operation may change the value type, more specifically:
+            // we preserve the key following the order of 1) materialized, 2) parent, 3) null
+            keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
+            // we preserve the value following the order of 1) materialized, 2) null
+            valueSerde = materializedInternal.valueSerde();
+            queryableStoreName = materializedInternal.queryableStoreName();
+            // only materialize if materialized is specified and it has queryable name
+            storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+        } else {
+            keySerde = this.keySerde;
+            valueSerde = null;
+            queryableStoreName = null;
+            storeBuilder = null;
+        }
 
-        // only materialize if users provide a specific queryable name
-        final String queryableStoreName = materializedInternal != null ? materializedInternal.queryableStoreName() : null;
-        final StoreBuilder<KeyValueStore<K, VR>> storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
 
         final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(
             this,
@@ -320,13 +336,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
 
         builder.addGraphNode(this.streamsGraphNode, tableNode);
 
-        // don't inherit parent value serde, since this operation may change the value type, more specifically:
-        // we preserve the key following the order of 1) materialized, 2) parent, 3) null
-        // we preserve the value following the order of 1) materialized, 2) null
         return new KTableImpl<>(
             name,
-            materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
-            materializedInternal != null ? materializedInternal.valueSerde() : null,
+            keySerde,
+            valueSerde,
             sourceNodes,
             queryableStoreName,
             processorSupplier,