You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/16 18:47:37 UTC
[kafka] branch trunk updated: KAFKA-4218: Enable access to key in
ValueTransformer and ValueMapper
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bcc712b KAFKA-4218: Enable access to key in ValueTransformer and ValueMapper
bcc712b is described below
commit bcc712b45820da74b44209857ebbf7b9d59e0ed7
Author: Jeyhun Karimov <je...@gmail.com>
AuthorDate: Tue Jan 16 10:47:29 2018 -0800
KAFKA-4218: Enable access to key in ValueTransformer and ValueMapper
This PR is the partial implementation for KIP-149. As the discussion for this KIP is still ongoing, I made a PR on the "safe" portions of the KIP (so that it can be included in the next release) which are 1) `ValueMapperWithKey`, 2) `ValueTransformerWithKeySupplier`, and 3) `ValueTransformerWithKey`.
Author: Jeyhun Karimov <je...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #4309 from jeyhunkarimov/KIP-149_hope_last
---
.../org/apache/kafka/streams/kstream/KStream.java | 203 ++++++++++++++++++---
.../org/apache/kafka/streams/kstream/KTable.java | 88 ++++++++-
.../apache/kafka/streams/kstream/ValueMapper.java | 4 +
.../{ValueMapper.java => ValueMapperWithKey.java} | 29 +--
.../kafka/streams/kstream/ValueTransformer.java | 2 +
.../streams/kstream/ValueTransformerSupplier.java | 3 +
...ansformer.java => ValueTransformerWithKey.java} | 64 +++----
...r.java => ValueTransformerWithKeySupplier.java} | 16 +-
.../streams/kstream/internals/AbstractStream.java | 78 ++++++++
.../InternalValueTransformerWithKey.java} | 23 +--
.../InternalValueTransformerWithKeySupplier.java} | 26 +--
.../kstream/internals/KStreamFlatMapValues.java | 8 +-
.../streams/kstream/internals/KStreamImpl.java | 29 ++-
.../kstream/internals/KStreamMapValues.java | 12 +-
.../kstream/internals/KStreamTransformValues.java | 12 +-
.../streams/kstream/internals/KTableImpl.java | 43 +++--
.../streams/kstream/internals/KTableMapValues.java | 16 +-
.../internals/KStreamFlatMapValuesTest.java | 49 +++--
.../streams/kstream/internals/KStreamImplTest.java | 24 ++-
.../kstream/internals/KStreamMapValuesTest.java | 35 +++-
.../internals/KStreamTransformValuesTest.java | 89 +++++++--
.../streams/kstream/internals/KTableImplTest.java | 8 +-
22 files changed, 654 insertions(+), 207 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 0d1d201..ddaa61e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -108,7 +108,9 @@ public interface KStream<K, V> {
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
*/
<KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
@@ -142,9 +144,12 @@ public interface KStream<K, V> {
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
* @see #transform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
*/
<KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
@@ -176,12 +181,50 @@ public interface KStream<K, V> {
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
* @see #transform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
*/
<VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper);
/**
+ * Transform the value of each input record into a new value (with possible new type) of the output record.
+ * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateless record-by-record operation (cf.
+ * {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
+ * <p>
+ * The example below counts the number of tokens of key and value strings.
+ * <pre>{@code
+ * KStream<String, String> inputStream = builder.stream("topic");
+ * KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
+ * Integer apply(String readOnlyKey, String value) {
+ * return readOnlyKey.split(" ").length + value.split(" ").length;
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * So, setting a new value preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+ *
+ * @param mapper a {@link ValueMapperWithKey} that computes a new output value
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @see #selectKey(KeyValueMapper)
+ * @see #map(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ */
+ <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper);
+
+ /**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records.
@@ -220,9 +263,12 @@ public interface KStream<K, V> {
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
* @see #transform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
*/
<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
@@ -260,12 +306,61 @@ public interface KStream<K, V> {
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
* @see #transform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
*/
<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper);
/**
+ * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
+ * with the same key in the new stream.
+ * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
+ * stream (value type can be altered arbitrarily).
+ * The provided {@link ValueMapperWithKey} is applied to each input record and computes zero or more output values.
+ * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+ * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
+ * for stateful value transformation).
+ * <p>
+ * The example below splits input records {@code <Integer:String>}, with key=1, containing sentences as values
+ * into their words.
+ * <pre>{@code
+ * KStream<Integer, String> inputStream = builder.stream("topic");
+ * KStream<Integer, String> outputStream = inputStream.flatMapValues(new ValueMapper<Integer, String, Iterable<String>> {
+ * Iterable<Integer, String> apply(Integer readOnlyKey, String value) {
+ * if(readOnlyKey == 1) {
+ * return Arrays.asList(value.split(" "));
+ * } else {
+ * return Arrays.asList(value);
+ * }
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
+ * and the return value must not be {@code null}.
+ * <p>
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * So, splitting a record into multiple records with the same key preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+ *
+ * @param mapper a {@link ValueMapperWithKey} the computes the new output values
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type
+ * @see #selectKey(KeyValueMapper)
+ * @see #map(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ */
+ <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper);
+
+ /**
* Print the records of this stream to {@code System.out}.
* This function will use the generated name of the parent processor node to label the key/value pairs printed to
* the console.
@@ -884,14 +979,12 @@ public interface KStream<K, V> {
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
* // create store
- * StateStoreSupplier myStore = Stores.create("myTransformState")
- * .withKeys(...)
- * .withValues(...)
- * .persistent() // optional
- * .build();
- *
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
+ * Serdes.String(),
+ * Serdes.String());
* // register store
- * builder.addStore(myStore);
+ * builder.addStateStore(keyValueStoreBuilder);
*
* KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
* }</pre>
@@ -942,6 +1035,7 @@ public interface KStream<K, V> {
* @return a {@code KStream} that contains more or less records with new key and value (possibly of different type)
* @see #flatMap(KeyValueMapper)
* @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
* @see #process(ProcessorSupplier, String...)
*/
<K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
@@ -959,14 +1053,12 @@ public interface KStream<K, V> {
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
* // create store
- * StateStoreSupplier myStore = Stores.create("myValueTransformState")
- * .withKeys(...)
- * .withValues(...)
- * .persistent() // optional
- * .build();
- *
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+ * Serdes.String(),
+ * Serdes.String());
* // register store
- * builder.addStore(myStore);
+ * builder.addStateStore(keyValueStoreBuilder);
*
* KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
* }</pre>
@@ -1012,12 +1104,83 @@ public interface KStream<K, V> {
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
* @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
* @see #transform(TransformerSupplier, String...)
*/
<VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames);
/**
+ * Transform the value of each input record into a new value (with possible new type) of the output record.
+ * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applies to each input
+ * record value and computes a new value for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
+ * periodic actions get be performed.
+ * <p>
+ * In order to assign a state, the state must be created and registered beforehand:
+ * <pre>{@code
+ * // create store
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+ * Serdes.String(),
+ * Serdes.String());
+ * // register store
+ * builder.addStateStore(keyValueStoreBuilder);
+ *
+ * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+ * }</pre>
+ * <p>
+ * Within the {@link ValueTransformerWithKey}, the state is obtained via the
+ * {@link ProcessorContext}.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+ * a schedule must be registered.
+ * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
+ * pairs should be emitted via {@link ProcessorContext#forward(Object, Object)
+ * ProcessorContext.forward()}.
+ * <pre>{@code
+ * new ValueTransformerWithKeySupplier() {
+ * ValueTransformerWithKey get() {
+ * return new ValueTransformerWithKey() {
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.state = context.getStateStore("myValueTransformState");
+ * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
+ * }
+ *
+ * NewValueType transform(K readOnlyKey, V value) {
+ * // can access this.state and use read-only key
+ * return new NewValueType(readOnlyKey); // or null
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ * <p>
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * So, setting a new value preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+ *
+ * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
+ * {@link ValueTransformerWithKey}
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ */
+ <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
+ final String... stateStoreNames);
+
+ /**
* Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
* {@link ProcessorSupplier}).
* This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
@@ -1028,14 +1191,12 @@ public interface KStream<K, V> {
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
* // create store
- * StateStoreSupplier myStore = Stores.create("myProcessorState")
- * .withKeys(...)
- * .withValues(...)
- * .persistent() // optional
- * .build();
- *
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
+ * Serdes.String(),
+ * Serdes.String());
* // register store
- * builder.addStore(myStore);
+ * builder.addStateStore(keyValueStoreBuilder);
*
* inputStream.process(new ProcessorSupplier() { ... }, "myProcessorState");
* }</pre>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 33e56aa..3290150 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -345,7 +345,7 @@ public interface KTable<K, V> {
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
- * (with possible new type)in the new {@code KTable}.
+ * (with possible new type) in the new {@code KTable}.
* For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
* computes a new value for it, resulting in an update record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
@@ -379,7 +379,42 @@ public interface KTable<K, V> {
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
- * (with possible new type)in the new {@code KTable}.
+ * (with possible new type) in the new {@code KTable}.
+ * For each {@code KTable} update the provided {@link ValueMapperWithKey} is applied to the value of the update
+ * record and computes a new value for it, resulting in an update record for the result {@code KTable}.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * The example below counts the number of token of value and key strings.
+ * <pre>{@code
+ * KTable<String, String> inputTable = builder.table("topic");
+ * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> {
+ * Integer apply(String readOnlyKey, String value) {
+ * return readOnlyKey.split(" ").length + value.split(" ").length;
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * This operation preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+ * the result {@code KTable}.
+ * <p>
+ * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapperWithKey)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+ * delete the corresponding record in the result {@code KTable}.
+ *
+ * @param mapper a {@link ValueMapperWithKey} that computes a new output value
+ * @param <VR> the value type of the result {@code KTable}
+ * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+ */
+ <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper);
+
+ /**
+ * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+ * (with possible new type) in the new {@code KTable}.
* For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
* computes a new value for it, resulting in an update record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
@@ -423,7 +458,52 @@ public interface KTable<K, V> {
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
- * (with possible new type)in the new {@code KTable}.
+ * (with possible new type) in the new {@code KTable}.
+ * For each {@code KTable} update the provided {@link ValueMapperWithKey} is applied to the value of the update
+ * record and computes a new value for it, resulting in an update record for the result {@code KTable}.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * The example below counts the number of token of value and key strings.
+ * <pre>{@code
+ * KTable<String, String> inputTable = builder.table("topic");
+ * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> {
+ * Integer apply(String readOnlyKey, String value) {
+ * return readOnlyKey.split(" ").length + value.split(" ").length;
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+ * <p>
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * This operation preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+ * the result {@code KTable}.
+ * <p>
+ * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+ * delete the corresponding record in the result {@code KTable}.
+ *
+ * @param mapper a {@link ValueMapperWithKey} that computes a new output value
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized. Cannot be {@code null}
+ * @param <VR> the value type of the result {@code KTable}
+ *
+ * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+ */
+ <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
+ final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
+
+ /**
+ * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+ * (with possible new type) in the new {@code KTable}.
* For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
* computes a new value for it, resulting in an update record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
@@ -471,7 +551,7 @@ public interface KTable<K, V> {
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
- * (with possible new type)in the new {@code KTable}.
+ * (with possible new type) in the new {@code KTable}.
* For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
* computes a new value for it, resulting in an update record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index 51396b5..be550a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -29,9 +29,13 @@ package org.apache.kafka.streams.kstream;
* @param <VR> mapped value type
* @see KeyValueMapper
* @see ValueTransformer
+ * @see ValueTransformerWithKey
* @see KStream#mapValues(ValueMapper)
+ * @see KStream#mapValues(ValueMapperWithKey)
* @see KStream#flatMapValues(ValueMapper)
+ * @see KStream#flatMapValues(ValueMapperWithKey)
* @see KTable#mapValues(ValueMapper)
+ * @see KTable#mapValues(ValueMapperWithKey)
*/
public interface ValueMapper<V, VR> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java
similarity index 57%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java
index 51396b5..b20c61a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java
@@ -16,30 +16,37 @@
*/
package org.apache.kafka.streams.kstream;
-
/**
- * The {@code ValueMapper} interface for mapping a value to a new value of arbitrary type.
- * This is a stateless record-by-record operation, i.e, {@link #apply(Object)} is invoked individually for each record
- * of a stream (cf. {@link ValueTransformer} for stateful value transformation).
- * If {@code ValueMapper} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the record's
- * key is preserved.
+ * The {@code ValueMapperWithKey} interface for mapping a value to a new value of arbitrary type.
+ * This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is invoked individually for each
+ * record of a stream (cf. {@link ValueTransformer} for stateful value transformation).
+ * If {@code ValueMapperWithKey} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the
+ * record's key is preserved.
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* If a record's key and value should be modified {@link KeyValueMapper} can be used.
*
+ * @param <K> key type
* @param <V> value type
* @param <VR> mapped value type
* @see KeyValueMapper
* @see ValueTransformer
+ * @see ValueTransformerWithKey
* @see KStream#mapValues(ValueMapper)
+ * @see KStream#mapValues(ValueMapperWithKey)
* @see KStream#flatMapValues(ValueMapper)
+ * @see KStream#flatMapValues(ValueMapperWithKey)
* @see KTable#mapValues(ValueMapper)
+ * @see KTable#mapValues(ValueMapperWithKey)
*/
-public interface ValueMapper<V, VR> {
+
+public interface ValueMapperWithKey<K, V, VR> {
/**
- * Map the given value to a new value.
+ * Map the given [key and ]value to a new value.
*
- * @param value the value to be mapped
+ * @param readOnlyKey the read-only key
+ * @param value the value to be mapped
* @return the new value
*/
- VR apply(final V value);
-}
+ VR apply(final K readOnlyKey, final V value);
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 0a8e890..1802a61 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -40,7 +40,9 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
* @param <V> value type
* @param <VR> transformed value type
* @see ValueTransformerSupplier
+ * @see ValueTransformerWithKeySupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
+ * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
*/
public interface ValueTransformer<V, VR> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
index 78234ae..10cf543 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -23,7 +23,10 @@ package org.apache.kafka.streams.kstream;
* @param <V> value type
* @param <VR> transformed value type
* @see ValueTransformer
+ * @see ValueTransformerWithKey
+ * @see ValueTransformerWithKeySupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
+ * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
* @see TransformerSupplier
* @see KStream#transform(TransformerSupplier, String...)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
similarity index 61%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
index 0a8e890..128c61f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
@@ -22,28 +22,34 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TimestampExtractor;
/**
- * The {@code ValueTransformer} interface for stateful mapping of a value to a new value (with possible new type).
- * This is a stateful record-by-record operation, i.e, {@link #transform(Object)} is invoked individually for each
+ * The {@code ValueTransformerWithKey} interface for stateful mapping of a value to a new value (with possible new type).
+ * This is a stateful record-by-record operation, i.e, {@link #transform(Object, Object)} is invoked individually for each
* record of a stream and can access and modify a state that is available beyond a single call of
- * {@link #transform(Object)} (cf. {@link ValueMapper} for stateless value transformation).
- * Additionally, this {@code ValueTransformer} can {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule}
- * a method to be {@link Punctuator#punctuate(long) called periodically} with the provided context.
- * If {@code ValueTransformer} is applied to a {@link KeyValue} pair record the record's key is preserved.
+ * {@link #transform(Object, Object)} (cf. {@link ValueMapper} for stateless value transformation).
+ * Additionally, this {@code ValueTransformerWithKey} can
+ * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method to be
+ * {@link Punctuator#punctuate(long) called periodically} with the provided context.
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * If {@code ValueTransformerWithKey} is applied to a {@link KeyValue} pair record the record's key is preserved.
* <p>
- * Use {@link ValueTransformerSupplier} to provide new instances of {@code ValueTransformer} to Kafka Stream's runtime.
+ * Use {@link ValueTransformerWithKeySupplier} to provide new instances of {@code {@link ValueTransformerWithKey} to
+ * Kafka Stream's runtime.
* <p>
* If a record's key and value should be modified {@link Transformer} can be used.
*
+ * @param <K> key type
* @param <V> value type
* @param <VR> transformed value type
- * @see ValueTransformerSupplier
+ * @see ValueTransformer
+ * @see ValueTransformerWithKeySupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
+ * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
*/
-public interface ValueTransformer<V, VR> {
+
+public interface ValueTransformerWithKey<K, V, VR> {
/**
* Initialize this transformer.
@@ -54,12 +60,12 @@ public interface ValueTransformer<V, VR> {
* {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s.
* <p>
* Note that {@link ProcessorContext} is updated in the background with the current record's meta data.
- * Thus, it only contains valid record meta data when accessed within {@link #transform(Object)}.
+ * Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}.
* <p>
* Note that using {@link ProcessorContext#forward(Object, Object)},
* {@link ProcessorContext#forward(Object, Object, int)}, or
* {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of
- * {@code ValueTransformer} and will result in an {@link StreamsException exception}.
+ * {@code ValueTransformerWithKey} and will result in an {@link StreamsException exception}.
*
* @param context the context
* @throws IllegalStateException If store gets registered after initialization is already finished
@@ -68,8 +74,8 @@ public interface ValueTransformer<V, VR> {
void init(final ProcessorContext context);
/**
- * Transform the given value to a new value.
- * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerSupplier, String...)
+ * Transform the given [key and ]value to a new value.
+ * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* attached} to this operator can be accessed and modified arbitrarily (cf.
* {@link ProcessorContext#getStateStore(String)}).
* <p>
@@ -78,32 +84,11 @@ public interface ValueTransformer<V, VR> {
* {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and
* will result in an {@link StreamsException exception}.
*
- * @param value the value to be transformed
+ * @param readOnlyKey the read-only key
+ * @param value the value to be transformed
* @return the new value
*/
- VR transform(final V value);
-
- /**
- * Perform any periodic operations if this processor {@link ProcessorContext#schedule(long) schedule itself} with
- * the context during {@link #init(ProcessorContext) initialization}.
- * <p>
- * It is not possible to return any new output records within {@code punctuate}.
- * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
- * or {@link ProcessorContext#forward(Object, Object, String)} will result in an
- * {@link StreamsException exception}.
- * Furthermore, {@code punctuate} must return {@code null}.
- * <p>
- * Note, that {@code punctuate} is called base on <it>stream time</it> (i.e., time progress with regard to
- * timestamps return by the used {@link TimestampExtractor})
- * and not based on wall-clock time.
- *
- * @deprecated Please use {@link Punctuator} functional interface instead.
- *
- * @param timestamp the stream time when {@code punctuate} is being called
- * @return must return {@code null}—otherwise, an {@link StreamsException exception} will be thrown
- */
- @Deprecated
- VR punctuate(final long timestamp);
+ VR transform(final K readOnlyKey, final V value);
/**
* Close this processor and clean up any resources.
@@ -113,5 +98,4 @@ public interface ValueTransformer<V, VR> {
* or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}.
*/
void close();
-
-}
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
similarity index 76%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
index 78234ae..766c84e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
@@ -16,24 +16,18 @@
*/
package org.apache.kafka.streams.kstream;
-
/**
- * A {@code ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
- *
+ * @param <K> key type
* @param <V> value type
* @param <VR> transformed value type
* @see ValueTransformer
+ * @see ValueTransformerWithKey
* @see KStream#transformValues(ValueTransformerSupplier, String...)
+ * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
* @see TransformerSupplier
* @see KStream#transform(TransformerSupplier, String...)
*/
-public interface ValueTransformerSupplier<V, VR> {
-
- /**
- * Return a new {@link ValueTransformer} instance.
- *
- * @return a new {@link ValueTransformer} instance.
- */
- ValueTransformer<V, VR> get();
+public interface ValueTransformerWithKeySupplier<K, V, VR> {
+ ValueTransformerWithKey<K, V, VR> get();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 26e404e..51fe820 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -18,9 +18,17 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
@@ -111,5 +119,75 @@ public abstract class AbstractStream<K> {
.enableCaching();
}
+ static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR> valueMapper) {
+ Objects.requireNonNull(valueMapper, "valueMapper can't be null");
+ return new ValueMapperWithKey<K, V, VR>() {
+ @Override
+ public VR apply(final K readOnlyKey, final V value) {
+ return valueMapper.apply(value);
+ }
+ };
+ }
+ static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR> toInternalValueTransformerSupplier(final ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
+ Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+ final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
+ return new InternalValueTransformerWithKeySupplier<K, V, VR>() {
+ @Override
+ public InternalValueTransformerWithKey<K, V, VR> get() {
+ return new InternalValueTransformerWithKey<K, V, VR>() {
+ @Override
+ public VR punctuate(final long timestamp) {
+ return valueTransformer.punctuate(timestamp);
+ }
+
+ @Override
+ public void init(final ProcessorContext context) {
+ valueTransformer.init(context);
+ }
+
+ @Override
+ public VR transform(final K readOnlyKey, final V value) {
+ return valueTransformer.transform(value);
+ }
+
+ @Override
+ public void close() {
+ valueTransformer.close();
+ }
+ };
+ }
+ };
+ }
+
+ static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR> toInternalValueTransformerSupplier(final ValueTransformerWithKeySupplier<K, V, VR> valueTransformerWithKeySupplier) {
+ Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier can't be null");
+ final ValueTransformerWithKey<K, V, VR> valueTransformerWithKey = valueTransformerWithKeySupplier.get();
+ return new InternalValueTransformerWithKeySupplier<K, V, VR>() {
+ @Override
+ public InternalValueTransformerWithKey<K, V, VR> get() {
+ return new InternalValueTransformerWithKey<K, V, VR>() {
+ @Override
+ public VR punctuate(final long timestamp) {
+ throw new StreamsException("ValueTransformerWithKey#punctuate should not be called.");
+ }
+
+ @Override
+ public void init(final ProcessorContext context) {
+ valueTransformerWithKey.init(context);
+ }
+
+ @Override
+ public VR transform(final K readOnlyKey, final V value) {
+ return valueTransformerWithKey.transform(readOnlyKey, value);
+ }
+
+ @Override
+ public void close() {
+ valueTransformerWithKey.close();
+ }
+ };
+ }
+ };
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
similarity index 55%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
index 78234ae..636e409 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
@@ -14,26 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.kstream;
+package org.apache.kafka.streams.kstream.internals;
-/**
- * A {@code ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
- *
- * @param <V> value type
- * @param <VR> transformed value type
- * @see ValueTransformer
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see Transformer
- * @see TransformerSupplier
- * @see KStream#transform(TransformerSupplier, String...)
- */
-public interface ValueTransformerSupplier<V, VR> {
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
- /**
- * Return a new {@link ValueTransformer} instance.
- *
- * @return a new {@link ValueTransformer} instance.
- */
- ValueTransformer<V, VR> get();
+public interface InternalValueTransformerWithKey<K, V, VR> extends ValueTransformerWithKey<K, V, VR> {
+ VR punctuate(final long timestamp);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
similarity index 55%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
index 78234ae..3418e71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
@@ -14,26 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.kstream;
+package org.apache.kafka.streams.kstream.internals;
-
-/**
- * A {@code ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
- *
- * @param <V> value type
- * @param <VR> transformed value type
- * @see ValueTransformer
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see Transformer
- * @see TransformerSupplier
- * @see KStream#transform(TransformerSupplier, String...)
- */
-public interface ValueTransformerSupplier<V, VR> {
-
- /**
- * Return a new {@link ValueTransformer} instance.
- *
- * @return a new {@link ValueTransformer} instance.
- */
- ValueTransformer<V, VR> get();
-}
+public interface InternalValueTransformerWithKeySupplier<K, V, VR> {
+ InternalValueTransformerWithKey<K, V, VR> get();
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index ab9d227..7d0d270 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -16,16 +16,16 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
- private final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper;
+ private final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends V1>> mapper;
- KStreamFlatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
+ KStreamFlatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends V1>> mapper) {
this.mapper = mapper;
}
@@ -37,7 +37,7 @@ class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K, V> {
@Override
public void process(K key, V value) {
- Iterable<? extends V1> newValues = mapper.apply(value);
+ final Iterable<? extends V1> newValues = mapper.apply(key, value);
for (V1 v : newValues) {
context().forward(key, v);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 8e80315..db4d238 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -35,7 +35,9 @@ import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -176,6 +178,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
+ return mapValues(withKey(mapper));
+ }
+
+ @Override
+ public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
String name = builder.newProcessorName(MAPVALUES_NAME);
@@ -329,6 +336,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <V1> KStream<K, V1> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
+ return flatMapValues(withKey(mapper));
+ }
+
+ @Override
+ public <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
String name = builder.newProcessorName(FLATMAPVALUES_NAME);
@@ -514,9 +526,22 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public <V1> KStream<K, V1> transformValues(final ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
- String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
- builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
+ return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
+ }
+
+ @Override
+ public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
+
+ return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
+ }
+
+ private <VR> KStream<K, VR> transformValues(final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> internalValueTransformerWithKeySupplier,
+ final String... stateStoreNames) {
+ final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+ builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(internalValueTransformerWithKeySupplier), this.name);
if (stateStoreNames != null && stateStoreNames.length > 0) {
builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index 8897a6c..28c120e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -16,16 +16,16 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
- private final ValueMapper<V, V1> mapper;
+ private final ValueMapperWithKey<K, V, V1> mapper;
- public KStreamMapValues(ValueMapper<V, V1> mapper) {
+ public KStreamMapValues(final ValueMapperWithKey<K, V, V1> mapper) {
this.mapper = mapper;
}
@@ -36,9 +36,9 @@ class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
private class KStreamMapProcessor extends AbstractProcessor<K, V> {
@Override
- public void process(final K key, final V value) {
- V1 newValue = mapper.apply(value);
- context().forward(key, newValue);
+ public void process(final K readOnlyKey, final V value) {
+ final V1 newValue = mapper.apply(readOnlyKey, value);
+ context().forward(readOnlyKey, newValue);
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 55c16cc..ace4f69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -19,8 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.kstream.ValueTransformer;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -36,9 +34,9 @@ import java.util.Map;
public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> {
- private final ValueTransformerSupplier<V, R> valueTransformerSupplier;
+ private final InternalValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier;
- public KStreamTransformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier) {
+ public KStreamTransformValues(final InternalValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier) {
this.valueTransformerSupplier = valueTransformerSupplier;
}
@@ -49,10 +47,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
- private final ValueTransformer<V, R> valueTransformer;
+ private final InternalValueTransformerWithKey<K, V, R> valueTransformer;
private ProcessorContext context;
- public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) {
+ public KStreamTransformValuesProcessor(final InternalValueTransformerWithKey<K, V, R> valueTransformer) {
this.valueTransformer = valueTransformer;
}
@@ -168,7 +166,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
@Override
public void process(K key, V value) {
- context.forward(key, valueTransformer.transform(value));
+ context.forward(key, valueTransformer.transform(key, value));
}
@SuppressWarnings("deprecation")
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 3bc6f4b..4019039 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -250,7 +251,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@SuppressWarnings("deprecation")
- private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
+ private <V1> KTable<K, V1> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper,
final Serde<V1> valueSerde,
final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(mapper);
@@ -271,24 +272,37 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
- return mapValues(mapper, null, (String) null);
+ return doMapValues(withKey(mapper), null, null);
+ }
+
+ @Override
+ public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
+ return doMapValues(mapper, null, null);
+
}
@Override
public <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+ return mapValues(withKey(mapper), materialized);
+ }
+
+ @Override
+ public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
+ final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME);
final String name = builder.newProcessorName(MAPVALUES_NAME);
- final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this,
- mapper,
- materializedInternal.storeName());
+ final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(
+ this,
+ mapper,
+ materializedInternal.storeName());
builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
- builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal)
- .materialize(),
- name);
+ builder.internalTopologyBuilder.addStateStore(
+ new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+ name);
return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
}
@@ -297,11 +311,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
final Serde<V1> valueSerde,
final String queryableStoreName) {
- org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
- if (queryableStoreName != null) {
- storeSupplier = keyValueStore(this.keySerde, valueSerde, queryableStoreName);
- }
- return doMapValues(mapper, valueSerde, storeSupplier);
+ return mapValues(withKey(mapper), Materialized.<K, V1, KeyValueStore<Bytes, byte[]>>as(queryableStoreName).
+ withValueSerde(valueSerde).withKeySerde(this.keySerde));
}
@SuppressWarnings("deprecation")
@@ -310,7 +321,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final Serde<V1> valueSerde,
final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- return doMapValues(mapper, valueSerde, storeSupplier);
+ return doMapValues(withKey(mapper), valueSerde, storeSupplier);
}
@SuppressWarnings("deprecation")
@@ -545,9 +556,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
public KStream<K, V> toStream() {
String name = builder.newProcessorName(TOSTREAM_NAME);
- builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<K, Change<V>, V>(new ValueMapper<Change<V>, V>() {
+ builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(new ValueMapperWithKey<K, Change<V>, V>() {
@Override
- public V apply(Change<V> change) {
+ public V apply(final K key, final Change<V> change) {
return change.newValue;
}
}), this.name);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 41dd7cd..03fa3a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -26,11 +26,11 @@ import org.apache.kafka.streams.state.KeyValueStore;
class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
private final KTableImpl<K, ?, V> parent;
- private final ValueMapper<? super V, ? extends V1> mapper;
+ private final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper;
private final String queryableName;
private boolean sendOldValues = false;
- public KTableMapValues(final KTableImpl<K, ?, V> parent, final ValueMapper<? super V, ? extends V1> mapper,
+ public KTableMapValues(final KTableImpl<K, ?, V> parent, final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper,
final String queryableName) {
this.parent = parent;
this.mapper = mapper;
@@ -65,11 +65,11 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
sendOldValues = true;
}
- private V1 computeValue(V value) {
+ private V1 computeValue(final K key, final V value) {
V1 newValue = null;
if (value != null)
- newValue = mapper.apply(value);
+ newValue = mapper.apply(key, value);
return newValue;
}
@@ -91,8 +91,8 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
@Override
public void process(K key, Change<V> change) {
- V1 newValue = computeValue(change.newValue);
- V1 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
+ final V1 newValue = computeValue(key, change.newValue);
+ final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
if (queryableName != null) {
store.put(key, newValue);
@@ -118,7 +118,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
@Override
public V1 get(K key) {
- return computeValue(parentGetter.get(key));
+ return computeValue(key, parentGetter.get(key));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 59ab0ff..ecfa3aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Rule;
@@ -28,7 +29,7 @@ import org.junit.Test;
import java.util.ArrayList;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
public class KStreamFlatMapValuesTest {
@@ -54,24 +55,50 @@ public class KStreamFlatMapValuesTest {
final int[] expectedKeys = {0, 1, 2, 3};
- KStream<Integer, Integer> stream;
- MockProcessorSupplier<Integer, String> processor;
-
- processor = new MockProcessorSupplier<>();
- stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+ final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+ final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
stream.flatMapValues(mapper).process(processor);
driver.setUp(builder);
- for (int expectedKey : expectedKeys) {
+ for (final int expectedKey : expectedKeys) {
driver.process(topicName, expectedKey, expectedKey);
}
- assertEquals(8, processor.processed.size());
-
String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
+ assertArrayEquals(expected, processor.processed.toArray());
+ }
+
+
+ @Test
+ public void testFlatMapValuesWithKeys() {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ ValueMapperWithKey<Integer, Number, Iterable<String>> mapper =
+ new ValueMapperWithKey<Integer, Number, Iterable<String>>() {
+ @Override
+ public Iterable<String> apply(final Integer readOnlyKey, final Number value) {
+ ArrayList<String> result = new ArrayList<>();
+ result.add("v" + value);
+ result.add("k" + readOnlyKey);
+ return result;
+ }
+ };
+
+ final int[] expectedKeys = {0, 1, 2, 3};
+
+ final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+ final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
+
+ stream.flatMapValues(mapper).process(processor);
+
+ driver.setUp(builder);
+ for (final int expectedKey : expectedKeys) {
+ driver.process(topicName, expectedKey, expectedKey);
}
+
+ String[] expected = {"0:v0", "0:k0", "1:v1", "1:k1", "2:v2", "2:k2", "3:v3", "3:k3"};
+
+ assertArrayEquals(expected, processor.processed.toArray());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 562711d..2009806 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -36,6 +36,9 @@ import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
@@ -283,7 +286,12 @@ public class KStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullMapperOnMapValues() {
- testStream.mapValues(null);
+ testStream.mapValues((ValueMapper) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullMapperOnMapValuesWithKey() {
+ testStream.mapValues((ValueMapperWithKey) null);
}
@Test(expected = NullPointerException.class)
@@ -303,7 +311,12 @@ public class KStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullMapperOnFlatMapValues() {
- testStream.flatMapValues(null);
+ testStream.flatMapValues((ValueMapper) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() {
+ testStream.flatMapValues((ValueMapperWithKey) null);
}
@Test(expected = IllegalArgumentException.class)
@@ -333,7 +346,12 @@ public class KStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullTransformSupplierOnTransformValues() {
- testStream.transformValues(null);
+ testStream.transformValues((ValueTransformerSupplier) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTransformSupplierOnTransformValuesWithKey() {
+ testStream.transformValues((ValueTransformerWithKeySupplier) null);
}
@Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index e4bf23e..2cedfb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -22,12 +22,13 @@ import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Rule;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
public class KStreamMapValuesTest {
@@ -61,13 +62,37 @@ public class KStreamMapValuesTest {
for (int expectedKey : expectedKeys) {
driver.process(topicName, expectedKey, Integer.toString(expectedKey));
}
+ String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
- assertEquals(4, processor.processed.size());
+ assertArrayEquals(expected, processor.processed.toArray());
+ }
- String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
+ @Test
+ public void testMapValuesWithKeys() {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ ValueMapperWithKey<Integer, CharSequence, Integer> mapper =
+ new ValueMapperWithKey<Integer, CharSequence, Integer>() {
+ @Override
+ public Integer apply(final Integer readOnlyKey, final CharSequence value) {
+ return value.length() + readOnlyKey;
+ }
+ };
+
+ final int[] expectedKeys = {1, 10, 100, 1000};
+
+ KStream<Integer, String> stream;
+ MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
+ stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+ stream.mapValues(mapper).process(processor);
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
+ driver.setUp(builder);
+ for (int expectedKey : expectedKeys) {
+ driver.process(topicName, expectedKey, Integer.toString(expectedKey));
}
+ String[] expected = {"1:2", "10:12", "100:103", "1000:1004"};
+
+ assertArrayEquals(expected, processor.processed.toArray());
}
+
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 16f121e..1b34fab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -23,7 +23,9 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.test.KStreamTestDriver;
@@ -31,8 +33,8 @@ import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Rule;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
public class KStreamTransformValuesTest {
@@ -86,22 +88,82 @@ public class KStreamTransformValuesTest {
for (int expectedKey : expectedKeys) {
driver.process(topicName, expectedKey, expectedKey * 10);
}
+ String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
- assertEquals(4, processor.processed.size());
+ assertArrayEquals(expected, processor.processed.toArray());
+ }
- String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
+ @Test
+ public void testTransformWithKey() {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ ValueTransformerWithKeySupplier<Integer, Number, Integer> valueTransformerSupplier =
+ new ValueTransformerWithKeySupplier<Integer, Number, Integer>() {
+ public ValueTransformerWithKey<Integer, Number, Integer> get() {
+ return new ValueTransformerWithKey<Integer, Number, Integer>() {
+ private int total = 0;
+ @Override
+ public void init(final ProcessorContext context) {
+
+ }
+ @Override
+ public Integer transform(final Integer readOnlyKey, final Number value) {
+ total += value.intValue() + readOnlyKey;
+ return total;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ }
+ };
+
+ final int[] expectedKeys = {1, 10, 100, 1000};
+
+ KStream<Integer, Integer> stream;
+ MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
+ stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+ stream.transformValues(valueTransformerSupplier).process(processor);
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], processor.processed.get(i));
+ driver.setUp(builder);
+ for (int expectedKey : expectedKeys) {
+ driver.process(topicName, expectedKey, expectedKey * 10);
}
+ String[] expected = {"1:11", "10:121", "100:1221", "1000:12221"};
+
+ assertArrayEquals(expected, processor.processed.toArray());
}
+
@Test
public void shouldNotAllowValueTransformerToCallInternalProcessorContextMethods() {
- final KStreamTransformValues<Integer, Integer, Integer> transformValue = new KStreamTransformValues<>(new ValueTransformerSupplier<Integer, Integer>() {
+ final BadValueTransformer badValueTransformer = new BadValueTransformer();
+ final KStreamTransformValues<Integer, Integer, Integer> transformValue = new KStreamTransformValues<>(new InternalValueTransformerWithKeySupplier<Integer, Integer, Integer>() {
@Override
- public ValueTransformer<Integer, Integer> get() {
- return new BadValueTransformer();
+ public InternalValueTransformerWithKey<Integer, Integer, Integer> get() {
+ return new InternalValueTransformerWithKey<Integer, Integer, Integer>() {
+ @Override
+ public Integer punctuate(long timestamp) {
+ throw new StreamsException("ValueTransformerWithKey#punctuate should not be called.");
+ }
+
+ @Override
+ public void init(final ProcessorContext context) {
+ badValueTransformer.init(context);
+ }
+
+ @Override
+ public Integer transform(final Integer readOnlyKey, final Integer value) {
+ return badValueTransformer.transform(readOnlyKey, value);
+ }
+
+ @Override
+ public void close() {
+ badValueTransformer.close();
+ }
+ };
}
});
@@ -137,16 +199,16 @@ public class KStreamTransformValuesTest {
}
}
- private static final class BadValueTransformer implements ValueTransformer<Integer, Integer> {
+ private static final class BadValueTransformer implements ValueTransformerWithKey<Integer, Integer, Integer> {
private ProcessorContext context;
@Override
- public void init(ProcessorContext context) {
+ public void init(final ProcessorContext context) {
this.context = context;
}
@Override
- public Integer transform(Integer value) {
+ public Integer transform(final Integer key, final Integer value) {
if (value == 0) {
context.forward(null, null);
}
@@ -160,11 +222,6 @@ public class KStreamTransformValuesTest {
}
@Override
- public Integer punctuate(long timestamp) {
- return 1; // any not-null falue
- }
-
- @Override
public void close() { }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 9539b45..a7aed2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
@@ -393,7 +394,12 @@ public class KTableImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullMapperOnMapValues() {
- table.mapValues(null);
+ table.mapValues((ValueMapper) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullMapperOnMapValueWithKey() {
+ table.mapValues((ValueMapperWithKey) null);
}
@SuppressWarnings("deprecation")
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].