You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/01 23:25:10 UTC
[kafka] branch trunk updated: KAFKA-7456: Serde Inheritance in DSL
(#5521)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d568f73 KAFKA-7456: Serde Inheritance in DSL (#5521)
d568f73 is described below
commit d568f73fc6ece3d29989413174eee0195d3d0a4a
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Oct 1 16:24:12 2018 -0700
KAFKA-7456: Serde Inheritance in DSL (#5521)
Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../kafka/streams/kstream/WindowedSerdes.java | 4 +-
.../streams/kstream/internals/AbstractStream.java | 44 ++-
.../internals/GroupedStreamAggregateBuilder.java | 19 +-
.../kstream/internals/InternalStreamsBuilder.java | 33 ++-
.../kstream/internals/KGroupedStreamImpl.java | 29 +-
.../kstream/internals/KGroupedTableImpl.java | 48 ++--
.../streams/kstream/internals/KStreamImpl.java | 303 ++++++++++-----------
.../streams/kstream/internals/KTableImpl.java | 148 +++++-----
.../kstream/internals/KTableKTableJoinMerger.java | 14 +-
.../internals/SessionWindowedKStreamImpl.java | 46 ++--
.../kstream/internals/TimeWindowedKStreamImpl.java | 40 +--
.../GroupedTableOperationRepartitionNode.java | 5 +-
.../internals/graph/KTableKTableJoinNode.java | 2 +-
.../internals/graph/ProcessorParameters.java | 6 +-
.../kafka/streams/kstream/WindowedSerdesTest.java | 23 +-
.../kstream/internals/AbstractStreamTest.java | 11 +-
.../streams/kstream/internals/KStreamImplTest.java | 123 +++++++++
.../streams/kstream/internals/KStreamMapTest.java | 21 +-
.../streams/kstream/internals/KTableImplTest.java | 77 ++++++
.../kstream/internals/graph/StreamsGraphTest.java | 4 +-
20 files changed, 594 insertions(+), 406 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
index 6a851a1..2474860 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
@@ -24,7 +24,7 @@ public class WindowedSerdes {
static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> {
// Default constructor needed for reflection object creation
public TimeWindowedSerde() {
- super(new TimeWindowedSerializer<T>(), new TimeWindowedDeserializer<T>());
+ super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>());
}
public TimeWindowedSerde(final Serde<T> inner) {
@@ -35,7 +35,7 @@ public class WindowedSerdes {
static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> {
// Default constructor needed for reflection object creation
public SessionWindowedSerde() {
- super(new SessionWindowedSerializer<T>(), new SessionWindowedDeserializer<T>());
+ super(new SessionWindowedSerializer<>(), new SessionWindowedDeserializer<>());
}
public SessionWindowedSerde(final Serde<T> inner) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index a0724eb..e870751 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
@@ -31,32 +32,48 @@ import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
-public abstract class AbstractStream<K> {
+/*
+ * Any classes (KTable, KStream, etc) extending this class should follow the serde specification precedence ordering as:
+ *
+ * 1) Overridden values via control objects (e.g. Materialized, Serialized, Consumed, etc)
+ * 2) Serdes that can be inferred from the operator itself (e.g. groupBy().count(), where value serde can default to `LongSerde`).
+ * 3) Serde inherited from parent operator if possible (note if the key / value types have been changed, then the corresponding serde cannot be inherited).
+ * 4) Default serde specified in the config.
+ */
+public abstract class AbstractStream<K, V> {
- protected final InternalStreamsBuilder builder;
protected final String name;
+ protected final Serde<K> keySerde;
+ protected final Serde<V> valSerde;
protected final Set<String> sourceNodes;
protected final StreamsGraphNode streamsGraphNode;
+ protected final InternalStreamsBuilder builder;
// This copy-constructor will allow to extend KStream
// and KTable APIs with new methods without impacting the public interface.
- public AbstractStream(final AbstractStream<K> stream) {
- this.builder = stream.builder;
+ public AbstractStream(final AbstractStream<K, V> stream) {
this.name = stream.name;
+ this.builder = stream.builder;
+ this.keySerde = stream.keySerde;
+ this.valSerde = stream.valSerde;
this.sourceNodes = stream.sourceNodes;
this.streamsGraphNode = stream.streamsGraphNode;
}
- AbstractStream(final InternalStreamsBuilder builder,
- final String name,
+ AbstractStream(final String name,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
final Set<String> sourceNodes,
- final StreamsGraphNode streamsGraphNode) {
+ final StreamsGraphNode streamsGraphNode,
+ final InternalStreamsBuilder builder) {
if (sourceNodes == null || sourceNodes.isEmpty()) {
throw new IllegalArgumentException("parameter <sourceNodes> must not be null or empty");
}
- this.builder = builder;
this.name = name;
+ this.builder = builder;
+ this.keySerde = keySerde;
+ this.valSerde = valSerde;
this.sourceNodes = sourceNodes;
this.streamsGraphNode = streamsGraphNode;
}
@@ -67,7 +84,7 @@ public abstract class AbstractStream<K> {
return builder.internalTopologyBuilder;
}
- Set<String> ensureJoinableWith(final AbstractStream<K> other) {
+ Set<String> ensureJoinableWith(final AbstractStream<K, ?> other) {
final Set<String> allSourceNodes = new HashSet<>();
allSourceNodes.addAll(sourceNodes);
allSourceNodes.addAll(other.sourceNodes);
@@ -122,4 +139,13 @@ public abstract class AbstractStream<K> {
}
};
}
+
+ // for testing only
+ public Serde<K> keySerde() {
+ return keySerde;
+ }
+
+ public Serde<V> valueSerde() {
+ return valSerde;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index a191c5a..9791db6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -63,11 +63,12 @@ class GroupedStreamAggregateBuilder<K, V> {
this.streamsGraphNode = streamsGraphNode;
}
-
- <KR, T> KTable<KR, T> build(final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
- final String functionName,
+ <KR, T> KTable<KR, T> build(final String functionName,
final StoreBuilder<? extends StateStore> storeBuilder,
- final boolean isQueryable) {
+ final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
+ final boolean isQueryable,
+ final Serde<KR> keySerde,
+ final Serde<T> valSerde) {
final String aggFunctionName = builder.newProcessorName(functionName);
@@ -95,13 +96,15 @@ class GroupedStreamAggregateBuilder<K, V> {
builder.addGraphNode(parentNode, statefulProcessorNode);
- return new KTableImpl<>(builder,
- aggFunctionName,
- aggregateSupplier,
+ return new KTableImpl<>(aggFunctionName,
+ keySerde,
+ valSerde,
sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName),
storeBuilder.name(),
isQueryable,
- statefulProcessorNode);
+ aggregateSupplier,
+ statefulProcessorNode,
+ builder);
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 49f49d0..8f76740 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -79,30 +79,33 @@ public class InternalStreamsBuilder implements InternalNameProvider {
public <K, V> KStream<K, V> stream(final Collection<String> topics,
final ConsumedInternal<K, V> consumed) {
final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
+ final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name, topics, consumed);
- final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name,
- topics,
- consumed);
addGraphNode(root, streamSourceNode);
- return new KStreamImpl<>(this, name, Collections.singleton(name), false, streamSourceNode);
+ return new KStreamImpl<>(name,
+ consumed.keySerde(),
+ consumed.valueSerde(),
+ Collections.singleton(name),
+ false,
+ streamSourceNode,
+ this);
}
public <K, V> KStream<K, V> stream(final Pattern topicPattern,
final ConsumedInternal<K, V> consumed) {
final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
-
- final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name,
- topicPattern,
- consumed);
+ final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed);
addGraphNode(root, streamPatternSourceNode);
- return new KStreamImpl<>(this,
- name,
+ return new KStreamImpl<>(name,
+ consumed.keySerde(),
+ consumed.valueSerde(),
Collections.singleton(name),
false,
- streamPatternSourceNode);
+ streamPatternSourceNode,
+ this);
}
@SuppressWarnings("unchecked")
@@ -129,15 +132,15 @@ public class InternalStreamsBuilder implements InternalNameProvider {
addGraphNode(root, tableSourceNode);
- return new KTableImpl<>(this,
- name,
- processorSupplier,
+ return new KTableImpl<>(name,
consumed.keySerde(),
consumed.valueSerde(),
Collections.singleton(source),
storeBuilder.name(),
materialized.isQueryable(),
- tableSourceNode);
+ processorSupplier,
+ tableSourceNode,
+ this);
}
@SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 5d4f9f3..da2eeb6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -36,24 +36,22 @@ import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Objects;
import java.util.Set;
-class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> {
+class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedStream<K, V> {
static final String REDUCE_NAME = "KSTREAM-REDUCE-";
static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
- private final Serde<K> keySerde;
- private final Serde<V> valSerde;
private final boolean repartitionRequired;
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
- KGroupedStreamImpl(final InternalStreamsBuilder builder,
- final String name,
- final Set<String> sourceNodes,
+ KGroupedStreamImpl(final String name,
final Serde<K> keySerde,
final Serde<V> valSerde,
+ final Set<String> sourceNodes,
final boolean repartitionRequired,
- final StreamsGraphNode streamsGraphNode) {
- super(builder, name, sourceNodes, streamsGraphNode);
+ final StreamsGraphNode streamsGraphNode,
+ final InternalStreamsBuilder builder) {
+ super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(
builder,
keySerde,
@@ -63,8 +61,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
name,
streamsGraphNode
);
- this.keySerde = keySerde;
- this.valSerde = valSerde;
this.repartitionRequired = repartitionRequired;
}
@@ -189,14 +185,15 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
);
}
- private <KR, T> KTable<KR, T> doAggregate(final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
- final String functionName,
- final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
+ private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier,
+ final String functionName,
+ final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
return aggregateBuilder.build(
- aggregateSupplier,
functionName,
new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
- materializedInternal.isQueryable()
- );
+ aggregateSupplier,
+ materializedInternal.isQueryable(),
+ materializedInternal.keySerde(),
+ materializedInternal.valueSerde());
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 08fb605..6ec3c0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
import java.util.Objects;
+import java.util.Set;
/**
* The implementation class of {@link KGroupedTable}.
@@ -41,14 +42,12 @@ import java.util.Objects;
* @param <K> the key type
* @param <V> the value type
*/
-public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroupedTable<K, V> {
+public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGroupedTable<K, V> {
private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
private static final String REDUCE_NAME = "KTABLE-REDUCE-";
- protected final Serde<K> keySerde;
- protected final Serde<V> valSerde;
private final Initializer<Long> countInitializer = () -> 0L;
private final Aggregator<K, V, Long> countAdder = (aggKey, value, aggregate) -> aggregate + 1L;
@@ -57,16 +56,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
KGroupedTableImpl(final InternalStreamsBuilder builder,
final String name,
- final String sourceName,
+ final Set<String> sourceNodes,
final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamsGraphNode streamsGraphNode) {
- super(builder, name, Collections.singleton(sourceName), streamsGraphNode);
- this.keySerde = keySerde;
- this.valSerde = valSerde;
+ super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
}
- @SuppressWarnings("unchecked")
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String functionName,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
@@ -75,9 +71,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
final String funcName = builder.newProcessorName(functionName);
final String topic = materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
- final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName,
- sourceName,
- topic);
+ final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, topic);
// the passed in StreamsGraphNode must be the parent of the repartition node
builder.addGraphNode(this.streamsGraphNode, repartitionNode);
@@ -90,34 +84,34 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
builder.addGraphNode(repartitionNode, statefulProcessorNode);
// return the KTable representation with the intermediate topic as the sources
- return new KTableImpl<>(builder,
- funcName,
- aggregateSupplier,
+ return new KTableImpl<>(funcName,
+ materialized.keySerde(),
+ materialized.valueSerde(),
Collections.singleton(sourceName),
materialized.storeName(),
materialized.isQueryable(),
- statefulProcessorNode);
+ aggregateSupplier,
+ statefulProcessorNode,
+ builder);
}
- @SuppressWarnings("unchecked")
private <T> StatefulProcessorNode getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
final String functionName,
- final ProcessorSupplier aggregateSupplier) {
+ final ProcessorSupplier<K, Change<V>> aggregateSupplier) {
- final ProcessorParameters aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
+ final ProcessorParameters<K, Change<V>> aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
return StatefulProcessorNode.statefulProcessorNodeBuilder()
.withNodeName(functionName)
.withProcessorParameters(aggregateFunctionProcessorParams)
- .withStoreBuilder(new KeyValueStoreMaterializer(materialized).materialize()).build();
+ .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize()).build();
}
- @SuppressWarnings("unchecked")
- private GroupedTableOperationRepartitionNode createRepartitionNode(final String sinkName,
- final String sourceName,
- final String topic) {
+ private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final String sinkName,
+ final String sourceName,
+ final String topic) {
- return GroupedTableOperationRepartitionNode.groupedTableOperationNodeBuilder()
+ return GroupedTableOperationRepartitionNode.<K, V>groupedTableOperationNodeBuilder()
.withRepartitionTopic(topic)
.withSinkName(sinkName)
.withSourceName(sourceName)
@@ -151,7 +145,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor) {
- return reduce(adder, subtractor, Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde));
+ return reduce(adder, subtractor, Materialized.with(keySerde, valSerde));
}
@Override
@@ -176,7 +170,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
@Override
public KTable<K, Long> count() {
- return count(Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>with(keySerde, Serdes.Long()));
+ return count(Materialized.with(keySerde, Serdes.Long()));
}
@Override
@@ -206,7 +200,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
final Aggregator<? super K, ? super V, T> subtractor) {
- return aggregate(initializer, adder, subtractor, Materialized.<K, T, KeyValueStore<Bytes, byte[]>>with(keySerde, null));
+ return aggregate(initializer, adder, subtractor, Materialized.with(keySerde, null));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 42c20a5..2a3bc8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -59,7 +59,7 @@ import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
-public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
+public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K, V> {
static final String SOURCE_NAME = "KSTREAM-SOURCE-";
@@ -113,12 +113,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private final boolean repartitionRequired;
- KStreamImpl(final InternalStreamsBuilder builder,
- final String name,
+ KStreamImpl(final String name,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
final Set<String> sourceNodes,
final boolean repartitionRequired,
- final StreamsGraphNode streamsGraphNode) {
- super(builder, name, sourceNodes, streamsGraphNode);
+ final StreamsGraphNode streamsGraphNode,
+ final InternalStreamsBuilder builder) {
+ super(name, keySerde, valueSerde, sourceNodes, streamsGraphNode, builder);
this.repartitionRequired = repartitionRequired;
}
@@ -129,12 +131,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, false), name);
- final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name,
- processorParameters,
- repartitionRequired);
+ final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
builder.addGraphNode(this.streamsGraphNode, filterProcessorNode);
- return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, filterProcessorNode);
+ return new KStreamImpl<>(name,
+ keySerde,
+ valSerde,
+ sourceNodes,
+ repartitionRequired,
+ filterProcessorNode,
+ builder);
}
@Override
@@ -143,46 +149,41 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final String name = builder.newProcessorName(FILTER_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, true), name);
-
-
- final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name,
- processorParameters,
- repartitionRequired);
+ final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode);
- return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, filterNotProcessorNode);
+ return new KStreamImpl<>(name,
+ keySerde,
+ valSerde,
+ sourceNodes,
+ repartitionRequired,
+ filterNotProcessorNode,
+ builder);
}
@Override
public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
-
final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper);
selectKeyProcessorNode.keyChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode);
- return new KStreamImpl<>(builder, selectKeyProcessorNode.nodeName(), sourceNodes, true, selectKeyProcessorNode);
+
+ // key serde cannot be preserved
+ return new KStreamImpl<>(selectKeyProcessorNode.nodeName(), null, valSerde, sourceNodes, true, selectKeyProcessorNode, builder);
}
private <K1> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
final String name = builder.newProcessorName(KEY_SELECT_NAME);
-
- final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>(
- (KeyValueMapper<K, V, KeyValue<K1, V>>) (key, value) -> new KeyValue<>(mapper.apply(key, value), value));
-
+ final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value));
final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap, name);
- return new ProcessorGraphNode<>(
- name,
- processorParameters,
- repartitionRequired
- );
-
+ return new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
}
@Override
@@ -192,14 +193,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMap<>(mapper), name);
- final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name,
- processorParameters,
- true);
+ final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name, processorParameters, true);
mapProcessorNode.keyChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
- return new KStreamImpl<>(builder, name, sourceNodes, true, mapProcessorNode);
+ // key and value serde cannot be preserved
+ return new KStreamImpl<>(name,
+ null,
+ null,
+ sourceNodes,
+ true,
+ mapProcessorNode,
+ builder);
}
@@ -214,15 +220,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final String name = builder.newProcessorName(MAPVALUES_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
+ final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
-
- final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name,
- processorParameters,
- repartitionRequired);
mapValuesProcessorNode.setValueChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
- return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, mapValuesProcessorNode);
+ // value serde cannot be preserved
+ return new KStreamImpl<>(name,
+ keySerde,
+ null,
+ sourceNodes,
+ repartitionRequired,
+ mapValuesProcessorNode,
+ builder);
}
@Override
@@ -232,31 +242,30 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final String name = builder.newProcessorName(PRINTING_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(printedInternal.build(this.name), name);
+ final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name, processorParameters, false);
-
- final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name,
- processorParameters,
- false);
builder.addGraphNode(this.streamsGraphNode, printNode);
}
@Override
- public <K1, V1> KStream<K1, V1> flatMap(
- final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+ public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
final String name = builder.newProcessorName(FLATMAP_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name);
-
-
- final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name,
- processorParameters,
- true);
+ final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name, processorParameters, true);
flatMapNode.keyChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, flatMapNode);
- return new KStreamImpl<>(builder, name, sourceNodes, true, flatMapNode);
+ // key and value serde cannot be preserved
+ return new KStreamImpl<>(name,
+ null,
+ null,
+ sourceNodes,
+ true,
+ flatMapNode,
+ builder);
}
@Override
@@ -270,15 +279,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final String name = builder.newProcessorName(FLATMAPVALUES_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name);
+ final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
-
- final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name,
- processorParameters,
- repartitionRequired);
flatMapValuesNode.setValueChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode);
- return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, flatMapValuesNode);
+ // value serde cannot be preserved
+ return new KStreamImpl<>(name, keySerde, null, sourceNodes, this.repartitionRequired, flatMapValuesNode, builder);
}
@Override
@@ -298,25 +305,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME);
}
-
final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), branchName);
-
- final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName,
- processorParameters,
- false);
+ final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName, processorParameters, false);
builder.addGraphNode(this.streamsGraphNode, branchNode);
final KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
for (int i = 0; i < predicates.length; i++) {
final ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new KStreamPassThrough<K, V>(), childNames[i]);
-
- final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i],
- innerProcessorParameters,
- repartitionRequired);
+ final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i], innerProcessorParameters, repartitionRequired);
builder.addGraphNode(branchNode, branchChildNode);
- branchChildren[i] = new KStreamImpl<>(builder, childNames[i], sourceNodes, this.repartitionRequired, branchChildNode);
+ branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valSerde, sourceNodes, repartitionRequired, branchChildNode, builder);
}
return branchChildren;
@@ -347,22 +347,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
mergeNode.setMergeNode(true);
builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode);
- return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning, mergeNode);
- }
- @Override
- public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
- final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
- to(topic, producedInternal);
- return builder.stream(
- Collections.singleton(topic),
- new ConsumedInternal<>(
- producedInternal.keySerde(),
- producedInternal.valueSerde(),
- new FailOnInvalidTimestamp(),
- null
- )
- );
+ // drop the serde as we cannot safely use either one to represent both streams
+ return new KStreamImpl<>(name, null, null, allSourceNodes, requireRepartitioning, mergeNode, builder);
}
@Override
@@ -375,7 +362,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
name
);
-
final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name,
processorParameters,
repartitionRequired);
@@ -394,12 +380,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<>(name,
processorParameters,
- repartitionRequired
- );
+ repartitionRequired);
builder.addGraphNode(this.streamsGraphNode, peekNode);
- return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired, peekNode);
+ return new KStreamImpl<>(name, keySerde, valSerde, sourceNodes, repartitionRequired, peekNode, builder);
}
@Override
@@ -408,6 +393,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
@Override
+ public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
+ final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+ to(topic, producedInternal);
+ return builder.stream(
+ Collections.singleton(topic),
+ new ConsumedInternal<>(
+ producedInternal.keySerde() != null ? producedInternal.keySerde() : keySerde,
+ producedInternal.valueSerde() != null ? producedInternal.valueSerde() : valSerde,
+ new FailOnInvalidTimestamp(),
+ null
+ )
+ );
+ }
+
+ @Override
public void to(final String topic) {
to(topic, Produced.with(null, null, null));
}
@@ -431,7 +431,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
to(topicExtractor, new ProducedInternal<>(produced));
}
- @SuppressWarnings("unchecked")
private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
final String name = builder.newProcessorName(SINK_NAME);
@@ -461,8 +460,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
transformNode.keyChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, transformNode);
-
- return new KStreamImpl<>(builder, name, sourceNodes, true, transformNode);
+ // cannot inherit key and value serde
+ return new KStreamImpl<>(name, null, null, sourceNodes, true, transformNode, builder);
}
@Override
@@ -496,7 +495,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
transformNode.setValueChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, transformNode);
- return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, transformNode);
+ // cannot inherit value serde
+ return new KStreamImpl<>(name, keySerde, null, sourceNodes, this.repartitionRequired, transformNode, builder);
}
@Override
@@ -565,11 +565,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
if (joinThis.repartitionRequired) {
- joinThis = joinThis.repartitionForJoin(joined.keySerde(), joined.valueSerde());
+ joinThis = joinThis.repartitionForJoin(joined);
}
if (joinOther.repartitionRequired) {
- joinOther = joinOther.repartitionForJoin(joined.keySerde(), joined.otherValueSerde());
+ joinOther = joinOther.repartitionForJoin(Joined.with(joined.keySerde(), joined.otherValueSerde(), joined.valueSerde()));
}
joinThis.ensureJoinableWith(joinOther);
@@ -587,17 +587,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
* Repartition a stream. This is required on join operations occurring after
* an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
*
- * @param keySerde Serdes for serializing the keys
- * @param valSerde Serdes for serializing the values
+ * @param joined joined control object
* @return a new {@link KStreamImpl}
*/
- private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde,
- final Serde<V> valSerde) {
-
+ private KStreamImpl<K, V> repartitionForJoin(final Joined<K, V, ?> joined) {
+ final Serde<K> repartitionKeySerde = joined.keySerde() != null ? joined.keySerde() : keySerde;
+ final Serde<V> repartitionValueSerde = joined.valueSerde() != null ? joined.valueSerde() : valSerde;
final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
final String repartitionedSourceName = createRepartitionedSource(builder,
- keySerde,
- valSerde,
+ repartitionKeySerde,
+ repartitionValueSerde,
null,
name,
optimizableRepartitionNodeBuilder);
@@ -605,7 +604,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
builder.addGraphNode(this.streamsGraphNode, optimizableRepartitionNode);
- return new KStreamImpl<>(builder, repartitionedSourceName, Collections.singleton(repartitionedSourceName), false, optimizableRepartitionNode);
+ return new KStreamImpl<>(repartitionedSourceName, repartitionKeySerde, repartitionValueSerde, Collections.singleton(repartitionedSourceName), false, optimizableRepartitionNode, builder);
}
static <K1, V1> String createRepartitionedSource(final InternalStreamsBuilder builder,
@@ -678,18 +677,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
if (repartitionRequired) {
- final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined.keySerde(), joined.valueSerde());
- return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false);
+ final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined);
+ return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, false);
} else {
- return doStreamTableJoin(other, joiner, false);
+ return doStreamTableJoin(other, joiner, joined, false);
}
}
@Override
- public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable,
- final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
- final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
- return globalTableJoin(globalTable, keyMapper, joiner, true);
+ public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+ return leftJoin(other, joiner, Joined.with(null, null, null));
+ }
+
+ @Override
+ public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
+ final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
+ final Joined<K, V, VT> joined) {
+ Objects.requireNonNull(other, "other can't be null");
+ Objects.requireNonNull(joiner, "joiner can't be null");
+ Objects.requireNonNull(joined, "joined can't be null");
+ if (repartitionRequired) {
+ final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined);
+ return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, true);
+ } else {
+ return doStreamTableJoin(other, joiner, joined, true);
+ }
}
@Override
@@ -699,6 +711,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return globalTableJoin(globalTable, keyMapper, joiner, false);
}
+ @Override
+ public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable,
+ final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
+ final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+ return globalTableJoin(globalTable, keyMapper, joiner, true);
+ }
+
private <K1, V1, V2> KStream<K, V2> globalTableJoin(final GlobalKTable<K1, V1> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
final ValueJoiner<? super V, ? super V1, ? extends V2> joiner,
@@ -724,17 +743,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
null);
builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
- return new KStreamImpl<>(builder, name, sourceNodes, false, streamTableJoinNode);
+ // do not have serde for joined result
+ return new KStreamImpl<>(name, keySerde, null, sourceNodes, false, streamTableJoinNode, builder);
}
@SuppressWarnings("unchecked")
private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+ final Joined<K, V, V1> joined,
final boolean leftJoin) {
Objects.requireNonNull(other, "other KTable can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
- final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+ final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, V1>) other);
final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
final ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(
@@ -743,7 +764,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
leftJoin
);
-
final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(processorSupplier, name);
final StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(
name,
@@ -754,27 +774,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
- return new KStreamImpl<>(builder, name, allSourceNodes, false, streamTableJoinNode);
- }
-
- @Override
- public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
- return leftJoin(other, joiner, Joined.with(null, null, null));
- }
-
- @Override
- public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
- final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
- final Joined<K, V, VT> joined) {
- Objects.requireNonNull(other, "other can't be null");
- Objects.requireNonNull(joiner, "joiner can't be null");
- Objects.requireNonNull(joined, "joined can't be null");
- if (repartitionRequired) {
- final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(joined.keySerde(), joined.valueSerde());
- return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true);
- } else {
- return doStreamTableJoin(other, joiner, true);
- }
+ // do not have serde for joined result
+ return new KStreamImpl<>(name, joined.keySerde() != null ? joined.keySerde() : keySerde, null, allSourceNodes, false, streamTableJoinNode, builder);
}
@Override
@@ -792,16 +793,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
selectKeyMapNode.keyChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode);
- return new KGroupedStreamImpl<>(
- builder,
- selectKeyMapNode.nodeName(),
- sourceNodes,
- serializedInternal.keySerde(),
- serializedInternal.valueSerde(),
- true,
- selectKeyMapNode
- );
-
+ return new KGroupedStreamImpl<>(selectKeyMapNode.nodeName(),
+ serializedInternal.keySerde(),
+ serializedInternal.valueSerde() != null ? serializedInternal.valueSerde() : valSerde,
+ sourceNodes,
+ true,
+ selectKeyMapNode,
+ builder);
}
@Override
@@ -812,14 +810,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized) {
final SerializedInternal<K, V> serializedInternal = new SerializedInternal<>(serialized);
- return new KGroupedStreamImpl<>(builder,
- this.name,
+ return new KGroupedStreamImpl<>(this.name,
+ serializedInternal.keySerde() != null ? serializedInternal.keySerde() : keySerde,
+ serializedInternal.valueSerde() != null ? serializedInternal.valueSerde() : valSerde,
sourceNodes,
- serializedInternal.keySerde(),
- serializedInternal.valueSerde(),
this.repartitionRequired,
- streamsGraphNode);
-
+ streamsGraphNode,
+ builder);
}
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
@@ -851,7 +848,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
this.rightOuter = rightOuter;
}
- @SuppressWarnings("unchecked")
public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
final KStream<K1, V2> other,
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
@@ -874,13 +870,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.name());
- final ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamName);
+ final ProcessorParameters<K1, V1> thisWindowStreamProcessorParams = new ProcessorParameters<>(thisWindowedStream, thisWindowStreamName);
final ProcessorGraphNode<K1, V1> thisWindowedStreamsNode = new ProcessorGraphNode<>(thisWindowStreamName, thisWindowStreamProcessorParams);
builder.addGraphNode(thisStreamsGraphNode, thisWindowedStreamsNode);
final KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.name());
- final ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamName);
+ final ProcessorParameters<K1, V2> otherWindowStreamProcessorParams = new ProcessorParameters<>(otherWindowedStream, otherWindowStreamName);
final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams);
builder.addGraphNode(otherStreamsGraphNode, otherWindowedStreamsNode);
@@ -902,11 +898,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
- final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
+ final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K1, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
- final ProcessorParameters joinThisProcessorParams = new ProcessorParameters(joinThis, joinThisName);
- final ProcessorParameters joinOtherProcessorParams = new ProcessorParameters(joinOther, joinOtherName);
- final ProcessorParameters joinMergeProcessorParams = new ProcessorParameters(joinMerge, joinMergeName);
+ final ProcessorParameters<K1, V1> joinThisProcessorParams = new ProcessorParameters<>(joinThis, joinThisName);
+ final ProcessorParameters<K1, V2> joinOtherProcessorParams = new ProcessorParameters<>(joinOther, joinOtherName);
+ final ProcessorParameters<K1, R> joinMergeProcessorParams = new ProcessorParameters<>(joinMerge, joinMergeName);
joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams)
.withJoinThisProcessorParameters(joinThisProcessorParams)
@@ -922,9 +918,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
builder.addGraphNode(Arrays.asList(thisStreamsGraphNode, otherStreamsGraphNode), joinGraphNode);
- final Set<String> allSourceNodes = new HashSet<>(((AbstractStream<K>) lhs).sourceNodes);
+ final Set<String> allSourceNodes = new HashSet<>(((KStreamImpl<K1, V1>) lhs).sourceNodes);
allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
- return new KStreamImpl<>(builder, joinMergeName, allSourceNodes, false, joinGraphNode);
+
+ // do not have serde for joined result;
+ // also for key serde we do not inherit from either since we cannot tell if these two serdes are different
+ return new KStreamImpl<>(joinMergeName, joined.keySerde(), null, allSourceNodes, false, joinGraphNode, builder);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ea5c304..c5b2970 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -57,7 +57,7 @@ import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchU
* @param <S> the source's (parent's) value type
* @param <V> the value type
*/
-public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
+public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<K, V> {
static final String SOURCE_NAME = "KTABLE-SOURCE-";
@@ -87,38 +87,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private final boolean isQueryable;
private boolean sendOldValues = false;
- private final Serde<K> keySerde;
- private final Serde<V> valSerde;
- public KTableImpl(final InternalStreamsBuilder builder,
- final String name,
- final ProcessorSupplier<?, ?> processorSupplier,
- final Set<String> sourceNodes,
- final String queryableStoreName,
- final boolean isQueryable,
- final StreamsGraphNode streamsGraphNode) {
- super(builder, name, sourceNodes, streamsGraphNode);
- this.processorSupplier = processorSupplier;
- this.queryableStoreName = queryableStoreName;
- this.keySerde = null;
- this.valSerde = null;
- this.isQueryable = isQueryable;
- }
-
- public KTableImpl(final InternalStreamsBuilder builder,
- final String name,
- final ProcessorSupplier<?, ?> processorSupplier,
+ public KTableImpl(final String name,
final Serde<K> keySerde,
final Serde<V> valSerde,
final Set<String> sourceNodes,
final String queryableStoreName,
final boolean isQueryable,
- final StreamsGraphNode streamsGraphNode) {
- super(builder, name, sourceNodes, streamsGraphNode);
+ final ProcessorSupplier<?, ?> processorSupplier,
+ final StreamsGraphNode streamsGraphNode,
+ final InternalStreamsBuilder builder) {
+ super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
this.processorSupplier = processorSupplier;
this.queryableStoreName = queryableStoreName;
- this.keySerde = keySerde;
- this.valSerde = valSerde;
this.isQueryable = isQueryable;
}
@@ -159,18 +140,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
builder.addGraphNode(this.streamsGraphNode, tableNode);
-
- return new KTableImpl<>(
- builder,
- name,
- processorSupplier,
- this.keySerde,
- this.valSerde,
- sourceNodes,
- shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
- shouldMaterialize,
- tableNode
- );
+ // we can inherit parent key and value serde if user do not provide specific overrides, more specifically:
+ // we preserve the key following the order of 1) materialized, 2) parent
+ // we preserve the value following the order of 1) materialized, 2) parent
+ return new KTableImpl<>(name,
+ materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
+ materializedInternal != null && materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : valSerde,
+ sourceNodes,
+ shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
+ shouldMaterialize,
+ processorSupplier,
+ tableNode,
+ builder);
}
@Override
@@ -234,14 +215,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
builder.addGraphNode(this.streamsGraphNode, tableNode);
+ // don't inherit parent value serde, since this operation may change the value type, more specifically:
+ // we preserve the key following the order of 1) materialized, 2) parent, 3) null
+ // we preserve the value following the order of 1) materialized, 2) null
return new KTableImpl<>(
- builder,
name,
- processorSupplier,
+ materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
+ materializedInternal != null ? materializedInternal.valueSerde() : null,
sourceNodes,
shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
shouldMaterialize,
- tableNode
+ processorSupplier,
+ tableNode,
+ builder
);
}
@@ -325,14 +311,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
builder.addGraphNode(this.streamsGraphNode, tableNode);
+ // don't inherit parent value serde, since this operation may change the value type, more specifically:
+ // we preserve the key following the order of 1) materialized, 2) parent, 3) null
+ // we preserve the value following the order of 1) materialized, 2) null
return new KTableImpl<>(
- builder,
name,
- processorSupplier,
+ materialized != null && materialized.keySerde() != null ? materialized.keySerde() : keySerde,
+ materialized != null ? materialized.valueSerde() : null,
sourceNodes,
shouldMaterialize ? materialized.storeName() : this.queryableStoreName,
shouldMaterialize,
- tableNode);
+ processorSupplier,
+ tableNode,
+ builder);
}
@Override
@@ -352,7 +343,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
builder.addGraphNode(this.streamsGraphNode, toStreamNode);
- return new KStreamImpl<>(builder, name, sourceNodes, false, toStreamNode);
+ // we can inherit parent key and value serde
+ return new KStreamImpl<>(name, keySerde, valSerde, sourceNodes, false, toStreamNode, builder);
}
@Override
@@ -382,15 +374,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
builder.addGraphNode(streamsGraphNode, node);
return new KTableImpl<K, S, V>(
- builder,
name,
- suppressionSupplier,
keySerde,
valSerde,
Collections.singleton(this.name),
null,
false,
- node
+ suppressionSupplier,
+ node,
+ builder
);
}
@@ -475,7 +467,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final String joinMergeName = builder.newProcessorName(MERGE_NAME);
return buildJoin(
- (AbstractStream<K>) other,
+ (AbstractStream<K, VO>) other,
joiner,
leftOuter,
rightOuter,
@@ -485,8 +477,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
);
}
- @SuppressWarnings("unchecked")
- private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other,
+ private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final boolean leftOuter,
final boolean rightOuter,
@@ -520,29 +511,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
}
- final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
- new KTableImpl<K, V, R>(
- builder,
- joinThisName,
- joinThis,
- sourceNodes,
- this.queryableStoreName,
- false,
- null
- ),
- new KTableImpl<K, V1, R>(
- builder,
- joinOtherName,
- joinOther,
- ((KTableImpl<K, ?, ?>) other).sourceNodes,
- ((KTableImpl<K, ?, ?>) other).queryableStoreName,
- false,
- null
- ),
- internalQueryableName
- );
+ final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(joinThis, joinOther, internalQueryableName);
- final KTableKTableJoinNode.KTableKTableJoinNodeBuilder kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder();
+ final KTableKTableJoinNode.KTableKTableJoinNodeBuilder<K, Change<V>, Change<V1>, Change<R>> kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder();
// only materialize if specified in Materialized
if (materializedInternal != null) {
@@ -550,9 +521,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
kTableJoinNodeBuilder.withNodeName(joinMergeName);
- final ProcessorParameters joinThisProcessorParameters = new ProcessorParameters(joinThis, joinThisName);
- final ProcessorParameters joinOtherProcessorParameters = new ProcessorParameters(joinOther, joinOtherName);
- final ProcessorParameters joinMergeProcessorParameters = new ProcessorParameters(joinMerge, joinMergeName);
+ final ProcessorParameters<K, Change<V>> joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
+ final ProcessorParameters<K, Change<V1>> joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, joinOtherName);
+ final ProcessorParameters<K, Change<R>> joinMergeProcessorParameters = new ProcessorParameters<>(joinMerge, joinMergeName);
kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters)
.withJoinOtherProcessorParameters(joinOtherProcessorParameters)
@@ -562,23 +533,26 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
.withOtherJoinSideNodeName(((KTableImpl) other).name)
.withThisJoinSideNodeName(name);
- final KTableKTableJoinNode kTableKTableJoinNode = kTableJoinNodeBuilder.build();
+ final KTableKTableJoinNode<K, Change<V>, Change<V1>, Change<R>> kTableKTableJoinNode = kTableJoinNodeBuilder.build();
builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode);
- return new KTableImpl<>(
- builder,
+ // we can inherit parent key serde if user do not provide specific overrides
+ return new KTableImpl<K, Change<R>, R>(
joinMergeName,
- joinMerge,
+ materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
+ materializedInternal != null ? materializedInternal.valueSerde() : null,
allSourceNodes,
internalQueryableName,
internalQueryableName != null,
- kTableKTableJoinNode
+ joinMerge,
+ kTableKTableJoinNode,
+ builder
);
}
@Override
public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
- return this.groupBy(selector, Serialized.with(null, null));
+ return groupBy(selector, Serialized.with(null, null));
}
@Override
@@ -592,20 +566,20 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
// select the aggregate key and values (old and new), it would require parent to send old values
- final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(
- selectName,
- processorParameters,
- false
- );
+ final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(selectName, processorParameters, false);
builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
this.enableSendingOldValues();
+
final SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<>(serialized);
+
+ // we cannot inherit parent key and value serdes since both of them may have changed;
+ // we can only inherit from what serialized specified here
return new KGroupedTableImpl<>(
builder,
selectName,
- this.name,
+ sourceNodes,
serializedInternal.keySerde(),
serializedInternal.valueSerde(),
groupByMapNode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 5c464b9..2ed70bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -27,13 +27,13 @@ import java.util.Set;
class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
- private final KTableImpl<K, ?, V> parent1;
- private final KTableImpl<K, ?, V> parent2;
+ private final KTableProcessorSupplier<K, ?, V> parent1;
+ private final KTableProcessorSupplier<K, ?, V> parent2;
private final String queryableName;
private boolean sendOldValues = false;
- KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1,
- final KTableImpl<K, ?, V> parent2,
+ KTableKTableJoinMerger(final KTableProcessorSupplier<K, ?, V> parent1,
+ final KTableProcessorSupplier<K, ?, V> parent2,
final String queryableName) {
this.parent1 = parent1;
this.parent2 = parent2;
@@ -55,13 +55,13 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
return new KTableValueGetterSupplier<K, V>() {
public KTableValueGetter<K, V> get() {
- return parent1.valueGetterSupplier().get();
+ return parent1.view().get();
}
@Override
public String[] storeNames() {
- final String[] storeNames1 = parent1.valueGetterSupplier().storeNames();
- final String[] storeNames2 = parent2.valueGetterSupplier().storeNames();
+ final String[] storeNames1 = parent1.view().storeNames();
+ final String[] storeNames2 = parent2.view().storeNames();
final Set<String> stores = new HashSet<>(storeNames1.length + storeNames2.length);
Collections.addAll(stores, storeNames1);
Collections.addAll(stores, storeNames2);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 98076e0..e185f4d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
@@ -40,10 +41,8 @@ import java.util.Set;
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
-public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implements SessionWindowedKStream<K, V> {
+public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements SessionWindowedKStream<K, V> {
private final SessionWindows windows;
- private final Serde<K> keySerde;
- private final Serde<V> valSerde;
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
private final Merger<K, Long> countMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo;
@@ -55,11 +54,9 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
final Serde<V> valSerde,
final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
final StreamsGraphNode streamsGraphNode) {
- super(builder, name, sourceNodes, streamsGraphNode);
+ super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
Objects.requireNonNull(windows, "windows can't be null");
this.windows = windows;
- this.keySerde = keySerde;
- this.valSerde = valSerde;
this.aggregateBuilder = aggregateBuilder;
}
@@ -92,16 +89,17 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
}
return aggregateBuilder.build(
+ AGGREGATE_NAME,
+ materialize(materializedInternal),
new KStreamSessionWindowAggregate<>(
- windows, materializedInternal.storeName(),
+ windows,
+ materializedInternal.storeName(),
aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator,
- countMerger
- ),
- AGGREGATE_NAME,
- materialize(materializedInternal),
- materializedInternal.isQueryable()
- );
+ countMerger),
+ materializedInternal.isQueryable(),
+ materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
+ materializedInternal.valueSerde());
}
@Override
@@ -125,6 +123,8 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
}
return aggregateBuilder.build(
+ REDUCE_NAME,
+ materialize(materializedInternal),
new KStreamSessionWindowAggregate<>(
windows,
materializedInternal.storeName(),
@@ -132,10 +132,9 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
reduceAggregator,
mergerForAggregator(reduceAggregator)
),
- REDUCE_NAME,
- materialize(materializedInternal),
- materializedInternal.isQueryable()
- );
+ materializedInternal.isQueryable(),
+ materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
+ materializedInternal.valueSerde());
}
@Override
@@ -160,18 +159,19 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
if (materializedInternal.keySerde() == null) {
materializedInternal.withKeySerde(keySerde);
}
+
return aggregateBuilder.build(
+ AGGREGATE_NAME,
+ materialize(materializedInternal),
new KStreamSessionWindowAggregate<>(
windows,
materializedInternal.storeName(),
initializer,
aggregator,
- sessionMerger
- ),
- AGGREGATE_NAME,
- materialize(materializedInternal),
- materializedInternal.isQueryable()
- );
+ sessionMerger),
+ materializedInternal.isQueryable(),
+ materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
+ materializedInternal.valueSerde());
}
@SuppressWarnings("deprecation") // continuing to support SessionWindows#maintainMs in fallback mode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 5c5cfb2..2ee8f7c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -40,11 +41,9 @@ import java.util.Set;
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
-public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements TimeWindowedKStream<K, V> {
+public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
private final Windows<W> windows;
- private final Serde<K> keySerde;
- private final Serde<V> valSerde;
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
TimeWindowedKStreamImpl(final Windows<W> windows,
@@ -55,9 +54,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
final Serde<V> valSerde,
final boolean repartitionRequired,
final StreamsGraphNode streamsGraphNode) {
- super(builder, name, sourceNodes, streamsGraphNode);
- this.valSerde = valSerde;
- this.keySerde = keySerde;
+ super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
this.windows = Objects.requireNonNull(windows, "windows can't be null");
this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name, streamsGraphNode);
}
@@ -92,19 +89,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
}
return aggregateBuilder.build(
- new KStreamWindowAggregate<>(
- windows,
- materializedInternal.storeName(),
- aggregateBuilder.countInitializer,
- aggregateBuilder.countAggregator
- ),
AGGREGATE_NAME,
materialize(materializedInternal),
- materializedInternal.isQueryable()
- );
+ new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+ materializedInternal.isQueryable(),
+ materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+ materializedInternal.valueSerde());
}
-
@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator) {
@@ -124,15 +116,12 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
materializedInternal.withKeySerde(keySerde);
}
return aggregateBuilder.build(
- new KStreamWindowAggregate<>(
- windows,
- materializedInternal.storeName(),
- initializer,
- aggregator
- ),
AGGREGATE_NAME,
materialize(materializedInternal),
- materializedInternal.isQueryable());
+ new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+ materializedInternal.isQueryable(),
+ materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+ materializedInternal.valueSerde());
}
@Override
@@ -156,11 +145,12 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
}
return aggregateBuilder.build(
- new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer),
REDUCE_NAME,
materialize(materializedInternal),
- materializedInternal.isQueryable()
- );
+ new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer),
+ materializedInternal.isQueryable(),
+ materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+ materializedInternal.valueSerde());
}
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
index 97fb69d..4d1b67d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
@@ -102,11 +102,10 @@ public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionN
}
- public static GroupedTableOperationRepartitionNodeBuilder groupedTableOperationNodeBuilder() {
- return new GroupedTableOperationRepartitionNodeBuilder();
+ public static <K1, V1> GroupedTableOperationRepartitionNodeBuilder<K1, V1> groupedTableOperationNodeBuilder() {
+ return new GroupedTableOperationRepartitionNodeBuilder<>();
}
-
public static final class GroupedTableOperationRepartitionNodeBuilder<K, V> {
private Serde<K> keySerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index b63b66d..41c27ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -99,7 +99,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
"} " + super.toString();
}
- public static <K, V, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
+ public static <K, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
return new KTableKTableJoinNodeBuilder<>();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index eb3d9f6..4b767b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -28,17 +28,17 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
*/
public class ProcessorParameters<K, V> {
- private final ProcessorSupplier<K, V> processorSupplier;
+ private final ProcessorSupplier<? super K, ? super V> processorSupplier;
private final String processorName;
- public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+ public ProcessorParameters(final ProcessorSupplier<? super K, ? super V> processorSupplier,
final String processorName) {
this.processorSupplier = processorSupplier;
this.processorName = processorName;
}
- public ProcessorSupplier<K, V> processorSupplier() {
+ public ProcessorSupplier<? super K, ? super V> processorSupplier() {
return processorSupplier;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
index 4360d08..ff180d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
@@ -17,16 +17,38 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
public class WindowedSerdesTest {
private final String topic = "sample";
@Test
+ public void shouldWrapForTimeWindowedSerde() {
+ final Serde<Windowed<String>> serde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
+ assertTrue(serde.serializer() instanceof TimeWindowedSerializer);
+ assertTrue(serde.deserializer() instanceof TimeWindowedDeserializer);
+ assertTrue(((TimeWindowedSerializer) serde.serializer()).innerSerializer() instanceof StringSerializer);
+ assertTrue(((TimeWindowedDeserializer) serde.deserializer()).innerDeserializer() instanceof StringDeserializer);
+ }
+
+ @Test
+ public void shouldWrapForSessionWindowedSerde() {
+ final Serde<Windowed<String>> serde = WindowedSerdes.sessionWindowedSerdeFrom(String.class);
+ assertTrue(serde.serializer() instanceof SessionWindowedSerializer);
+ assertTrue(serde.deserializer() instanceof SessionWindowedDeserializer);
+ assertTrue(((SessionWindowedSerializer) serde.serializer()).innerSerializer() instanceof StringSerializer);
+ assertTrue(((SessionWindowedDeserializer) serde.deserializer()).innerDeserializer() instanceof StringDeserializer);
+ }
+
+ @Test
public void testTimeWindowSerdeFrom() {
final Windowed<Integer> timeWindowed = new Windowed<>(10, new TimeWindow(0, Long.MAX_VALUE));
final Serde<Windowed<Integer>> timeWindowedSerde = WindowedSerdes.timeWindowedSerdeFrom(Integer.class);
@@ -43,5 +65,4 @@ public class WindowedSerdesTest {
final Windowed<Integer> windowed = sessionWindowedSerde.deserializer().deserialize(topic, bytes);
Assert.assertEquals(sessionWindowed, windowed);
}
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index d98fd79..b360cec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -96,7 +96,7 @@ public class AbstractStreamTest {
assertTrue(supplier.theCapturedProcessor().processed.size() <= expectedKeys.length);
}
- private class ExtendedKStream<K, V> extends AbstractStream<K> {
+ private class ExtendedKStream<K, V> extends AbstractStream<K, V> {
ExtendedKStream(final KStream<K, V> stream) {
super((KStreamImpl<K, V>) stream);
@@ -104,11 +104,12 @@ public class AbstractStreamTest {
KStream<K, V> randomFilter() {
final String name = builder.newProcessorName("RANDOM-FILTER-");
- final ProcessorGraphNode processorNode = new ProcessorGraphNode(name,
- new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name),
- false);
+ final ProcessorGraphNode<K, V> processorNode = new ProcessorGraphNode<>(
+ name,
+ new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name),
+ false);
builder.addGraphNode(this.streamsGraphNode, processorNode);
- return new KStreamImpl<>(builder, name, sourceNodes, false, processorNode);
+ return new KStreamImpl<K, V>(name, null, null, sourceNodes, false, processorNode, builder);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index bce7fc8..119a7b7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
@@ -32,12 +33,17 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
@@ -76,6 +82,8 @@ public class KStreamImplTest {
private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ private Serde<String> mySerde = new Serdes.StringSerde();
+
@Before
public void before() {
builder = new StreamsBuilder();
@@ -181,6 +189,121 @@ public class KStreamImplTest {
}
@Test
+ public void shouldPreserveSerdesForOperators() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> stream1 = builder.stream(Collections.singleton("topic-1"), stringConsumed);
+ final KTable<String, String> table1 = builder.table("topic-2", stringConsumed);
+ final GlobalKTable<String, String> table2 = builder.globalTable("topic-2", stringConsumed);
+ final ConsumedInternal<String, String> consumedInternal = new ConsumedInternal<>(stringConsumed);
+
+ final KeyValueMapper<String, String, String> selector = (key, value) -> key;
+ final KeyValueMapper<String, String, Iterable<KeyValue<String, String>>> flatSelector = (key, value) -> Collections.singleton(new KeyValue<>(key, value));
+ final ValueMapper<String, String> mapper = value -> value;
+ final ValueMapper<String, Iterable<String>> flatMapper = Collections::singleton;
+ final ValueJoiner<String, String, String> joiner = (value1, value2) -> value1;
+ final TransformerSupplier<String, String, KeyValue<String, String>> transformerSupplier = () -> new Transformer<String, String, KeyValue<String, String>>() {
+ @Override
+ public void init(final ProcessorContext context) {}
+
+ @Override
+ public KeyValue<String, String> transform(final String key, final String value) {
+ return new KeyValue<>(key, value);
+ }
+
+ @Override
+ public void close() {}
+ };
+ final ValueTransformerSupplier<String, String> valueTransformerSupplier = () -> new ValueTransformer<String, String>() {
+ @Override
+ public void init(final ProcessorContext context) {}
+
+ @Override
+ public String transform(final String value) {
+ return value;
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ assertEquals(((AbstractStream) stream1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) stream1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde());
+
+ assertEquals(((AbstractStream) stream1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) stream1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde());
+
+ assertNull(((AbstractStream) stream1.selectKey(selector)).keySerde());
+ assertEquals(((AbstractStream) stream1.selectKey(selector)).valueSerde(), consumedInternal.valueSerde());
+
+ assertNull(((AbstractStream) stream1.map(KeyValue::new)).keySerde());
+ assertNull(((AbstractStream) stream1.map(KeyValue::new)).valueSerde());
+
+ assertEquals(((AbstractStream) stream1.mapValues(mapper)).keySerde(), consumedInternal.keySerde());
+ assertNull(((AbstractStream) stream1.mapValues(mapper)).valueSerde());
+
+ assertNull(((AbstractStream) stream1.flatMap(flatSelector)).keySerde());
+ assertNull(((AbstractStream) stream1.flatMap(flatSelector)).valueSerde());
+
+ assertEquals(((AbstractStream) stream1.flatMapValues(flatMapper)).keySerde(), consumedInternal.keySerde());
+ assertNull(((AbstractStream) stream1.flatMapValues(flatMapper)).valueSerde());
+
+ assertNull(((AbstractStream) stream1.transform(transformerSupplier)).keySerde());
+ assertNull(((AbstractStream) stream1.transform(transformerSupplier)).valueSerde());
+
+ assertEquals(((AbstractStream) stream1.transformValues(valueTransformerSupplier)).keySerde(), consumedInternal.keySerde());
+ assertNull(((AbstractStream) stream1.transformValues(valueTransformerSupplier)).valueSerde());
+
+ assertNull(((AbstractStream) stream1.merge(stream1)).keySerde());
+ assertNull(((AbstractStream) stream1.merge(stream1)).valueSerde());
+
+ assertEquals(((AbstractStream) stream1.through("topic-3")).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) stream1.through("topic-3")).valueSerde(), consumedInternal.valueSerde());
+ assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+ assertEquals(((AbstractStream) stream1.groupByKey()).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) stream1.groupByKey()).valueSerde(), consumedInternal.valueSerde());
+ assertEquals(((AbstractStream) stream1.groupByKey(Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) stream1.groupByKey(Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+ assertEquals(((AbstractStream) stream1.groupBy(selector)).keySerde(), null);
+ assertEquals(((AbstractStream) stream1.groupBy(selector)).valueSerde(), consumedInternal.valueSerde());
+ assertEquals(((AbstractStream) stream1.groupBy(selector, Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) stream1.groupBy(selector, Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+ assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
+ assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
+ assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+ assertNull(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+
+ assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
+ assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
+ assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+ assertNull(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+
+ assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
+ assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
+ assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+ assertNull(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+
+ assertEquals(((AbstractStream) stream1.join(table1, joiner)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) stream1.join(table1, joiner)).valueSerde(), null);
+ assertEquals(((AbstractStream) stream1.join(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) stream1.join(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).valueSerde(), null);
+
+ assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner)).valueSerde(), null);
+ assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).valueSerde(), null);
+
+ assertEquals(((AbstractStream) stream1.join(table2, selector, joiner)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) stream1.join(table2, selector, joiner)).valueSerde(), null);
+
+ assertEquals(((AbstractStream) stream1.leftJoin(table2, selector, joiner)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) stream1.leftJoin(table2, selector, joiner)).valueSerde(), null);
+ }
+
+ @Test
public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 2692c17..10a27b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
@@ -43,20 +42,11 @@ public class KStreamMapTest {
@Test
public void testMap() {
final StreamsBuilder builder = new StreamsBuilder();
-
- final KeyValueMapper<Integer, String, KeyValue<String, Integer>> mapper =
- new KeyValueMapper<Integer, String, KeyValue<String, Integer>>() {
- @Override
- public KeyValue<String, Integer> apply(final Integer key, final String value) {
- return KeyValue.pair(value, key);
- }
- };
-
final int[] expectedKeys = new int[]{0, 1, 2, 3};
final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
- stream.map(mapper).process(supplier);
+ stream.map((key, value) -> KeyValue.pair(value, key)).process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
for (final int expectedKey : expectedKeys) {
@@ -75,16 +65,9 @@ public class KStreamMapTest {
@Test
public void testTypeVariance() {
- final KeyValueMapper<Number, Object, KeyValue<Number, String>> stringify = new KeyValueMapper<Number, Object, KeyValue<Number, String>>() {
- @Override
- public KeyValue<Number, String> apply(final Number key, final Object value) {
- return KeyValue.pair(key, key + ":" + value);
- }
- };
-
new StreamsBuilder()
.<Integer, String>stream("numbers")
- .map(stringify)
+ .map((key, value) -> KeyValue.pair(key, key + ":" + value))
.to("strings");
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index eb586e6..6e666c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -16,10 +16,12 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
@@ -28,13 +30,17 @@ import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
@@ -62,6 +68,7 @@ import static org.junit.Assert.assertNull;
public class KTableImplTest {
+ private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@@ -70,6 +77,8 @@ public class KTableImplTest {
private StreamsBuilder builder;
private KTable<String, String> table;
+ private Serde<String> mySerde = new Serdes.StringSerde();
+
@Before
public void setUp() {
builder = new StreamsBuilder();
@@ -126,6 +135,74 @@ public class KTableImplTest {
}
@Test
+ public void shouldPreserveSerdesForOperators() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KTable<String, String> table1 = builder.table("topic-2", stringConsumed);
+ final ConsumedInternal<String, String> consumedInternal = new ConsumedInternal<>(stringConsumed);
+
+ final KeyValueMapper<String, String, String> selector = (key, value) -> key;
+ final ValueMapper<String, String> mapper = value -> value;
+ final ValueJoiner<String, String, String> joiner = (value1, value2) -> value1;
+ final ValueTransformerWithKeySupplier<String, String, String> valueTransformerWithKeySupplier = () -> new ValueTransformerWithKey<String, String, String>() {
+ @Override
+ public void init(final ProcessorContext context) {}
+
+ @Override
+ public String transform(final String key, final String value) {
+ return value;
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ assertEquals(((AbstractStream) table1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde());
+ assertEquals(((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+ assertEquals(((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde());
+ assertEquals(((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+ assertEquals(((AbstractStream) table1.mapValues(mapper)).keySerde(), consumedInternal.keySerde());
+ assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde());
+ assertEquals(((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+ assertEquals(((AbstractStream) table1.toStream()).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) table1.toStream()).valueSerde(), consumedInternal.valueSerde());
+ assertNull(((AbstractStream) table1.toStream(selector)).keySerde());
+ assertEquals(((AbstractStream) table1.toStream(selector)).valueSerde(), consumedInternal.valueSerde());
+
+ assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), consumedInternal.keySerde());
+ assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
+ assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+ assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde(), null);
+ assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde(), null);
+ assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+ assertEquals(((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) table1.join(table1, joiner)).valueSerde(), null);
+ assertEquals(((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+ assertEquals(((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde(), null);
+ assertEquals(((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+ assertEquals(((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), consumedInternal.keySerde());
+ assertEquals(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde(), null);
+ assertEquals(((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+ assertEquals(((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+ }
+
+ @Test
public void testValueGetter() {
final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index d65f27e..75e9f51 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -38,9 +38,7 @@ import static org.junit.Assert.assertEquals;
public class StreamsGraphTest {
- final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
-
-
+ private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
// Test builds topology in succesive manner but only graph node not yet processed written to topology