You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/29 21:08:43 UTC
[kafka] branch trunk updated: Cleanup KTableImpl#doTransformValues
(#6519)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d10023e Cleanup KTableImpl#doTransformValues (#6519)
d10023e is described below
commit d10023e8d3093eaa6b0b3afa04c7ea1b9bee6c30
Author: Lee Dongjin <do...@apache.org>
AuthorDate: Sat Mar 30 06:08:20 2019 +0900
Cleanup KTableImpl#doTransformValues (#6519)
This PR is a follow-up of #6174 and #6453, which cleans up KTableImpl#doTransformValues method.
Reviewers: Bill Bejeck <bb...@gmail.com>
---
.../streams/kstream/internals/KTableImpl.java | 31 +++++++++++++++-------
1 file changed, 22 insertions(+), 9 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 67b3c31..e9291cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -295,12 +295,28 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
final String... stateStoreNames) {
Objects.requireNonNull(stateStoreNames, "stateStoreNames");
+ final Serde<K> keySerde;
+ final Serde<VR> valueSerde;
+ final String queryableStoreName;
+ final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
- final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+ if (materializedInternal != null) {
+ // don't inherit parent value serde, since this operation may change the value type, more specifically:
+ // we preserve the key following the order of 1) materialized, 2) parent, 3) null
+ keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
+ // we preserve the value following the order of 1) materialized, 2) null
+ valueSerde = materializedInternal.valueSerde();
+ queryableStoreName = materializedInternal.queryableStoreName();
+ // only materialize if materialized is specified and it has queryable name
+ storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+ } else {
+ keySerde = this.keySerde;
+ valueSerde = null;
+ queryableStoreName = null;
+ storeBuilder = null;
+ }
- // only materialize if users provide a specific queryable name
- final String queryableStoreName = materializedInternal != null ? materializedInternal.queryableStoreName() : null;
- final StoreBuilder<KeyValueStore<K, VR>> storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
+ final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(
this,
@@ -320,13 +336,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
builder.addGraphNode(this.streamsGraphNode, tableNode);
- // don't inherit parent value serde, since this operation may change the value type, more specifically:
- // we preserve the key following the order of 1) materialized, 2) parent, 3) null
- // we preserve the value following the order of 1) materialized, 2) null
return new KTableImpl<>(
name,
- materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
- materializedInternal != null ? materializedInternal.valueSerde() : null,
+ keySerde,
+ valueSerde,
sourceNodes,
queryableStoreName,
processorSupplier,