You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/16 18:47:37 UTC

[kafka] branch trunk updated: KAFKA-4218: Enable access to key in ValueTransformer and ValueMapper

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bcc712b  KAFKA-4218: Enable access to key in ValueTransformer and ValueMapper
bcc712b is described below

commit bcc712b45820da74b44209857ebbf7b9d59e0ed7
Author: Jeyhun Karimov <je...@gmail.com>
AuthorDate: Tue Jan 16 10:47:29 2018 -0800

    KAFKA-4218: Enable access to key in ValueTransformer and ValueMapper
    
    This PR is the partial implementation for KIP-149. As the discussion for this KIP is still ongoing, I made a PR on the "safe" portions of the KIP (so that it can be included in the next release) which are 1) `ValueMapperWithKey`, 2) `ValueTransformerWithKeySupplier`, and 3) `ValueTransformerWithKey`.
    
    Author: Jeyhun Karimov <je...@gmail.com>
    
    Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
    
    Closes #4309 from jeyhunkarimov/KIP-149_hope_last
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 203 ++++++++++++++++++---
 .../org/apache/kafka/streams/kstream/KTable.java   |  88 ++++++++-
 .../apache/kafka/streams/kstream/ValueMapper.java  |   4 +
 .../{ValueMapper.java => ValueMapperWithKey.java}  |  29 +--
 .../kafka/streams/kstream/ValueTransformer.java    |   2 +
 .../streams/kstream/ValueTransformerSupplier.java  |   3 +
 ...ansformer.java => ValueTransformerWithKey.java} |  64 +++----
 ...r.java => ValueTransformerWithKeySupplier.java} |  16 +-
 .../streams/kstream/internals/AbstractStream.java  |  78 ++++++++
 .../InternalValueTransformerWithKey.java}          |  23 +--
 .../InternalValueTransformerWithKeySupplier.java}  |  26 +--
 .../kstream/internals/KStreamFlatMapValues.java    |   8 +-
 .../streams/kstream/internals/KStreamImpl.java     |  29 ++-
 .../kstream/internals/KStreamMapValues.java        |  12 +-
 .../kstream/internals/KStreamTransformValues.java  |  12 +-
 .../streams/kstream/internals/KTableImpl.java      |  43 +++--
 .../streams/kstream/internals/KTableMapValues.java |  16 +-
 .../internals/KStreamFlatMapValuesTest.java        |  49 +++--
 .../streams/kstream/internals/KStreamImplTest.java |  24 ++-
 .../kstream/internals/KStreamMapValuesTest.java    |  35 +++-
 .../internals/KStreamTransformValuesTest.java      |  89 +++++++--
 .../streams/kstream/internals/KTableImplTest.java  |   8 +-
 22 files changed, 654 insertions(+), 207 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 0d1d201..ddaa61e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -108,7 +108,9 @@ public interface KStream<K, V> {
      * @see #map(KeyValueMapper)
      * @see #flatMap(KeyValueMapper)
      * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
      * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
      */
     <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
 
@@ -142,9 +144,12 @@ public interface KStream<K, V> {
      * @see #selectKey(KeyValueMapper)
      * @see #flatMap(KeyValueMapper)
      * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
      * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
      * @see #transform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      */
     <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
 
@@ -176,12 +181,50 @@ public interface KStream<K, V> {
      * @see #map(KeyValueMapper)
      * @see #flatMap(KeyValueMapper)
      * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
      * @see #transform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      */
     <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper);
 
     /**
+     * Transform the value of each input record into a new value (with possible new type) of the output record.
+     * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateless record-by-record operation (cf.
+     * {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
+     * <p>
+     * The example below counts the number of tokens of key and value strings.
+     * <pre>{@code
+     * KStream<String, String> inputStream = builder.stream("topic");
+     * KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
+     *     Integer apply(String readOnlyKey, String value) {
+     *         return readOnlyKey.split(" ").length + value.split(" ").length;
+     *     }
+     * });
+     * }</pre>
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * So, setting a new value preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+     *
+     * @param mapper a {@link ValueMapperWithKey} that computes a new output value
+     * @param <VR>   the value type of the result stream
+     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+     * @see #selectKey(KeyValueMapper)
+     * @see #map(KeyValueMapper)
+     * @see #flatMap(KeyValueMapper)
+     * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     */
+    <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper);
+
+    /**
      * Transform each record of the input stream into zero or more records in the output stream (both key and value type
      * can be altered arbitrarily).
      * The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records.
@@ -220,9 +263,12 @@ public interface KStream<K, V> {
      * @see #selectKey(KeyValueMapper)
      * @see #map(KeyValueMapper)
      * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
      * @see #flatMapValues(ValueMapper)
+     * @see #flatMapValues(ValueMapperWithKey)
      * @see #transform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      */
     <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
 
@@ -260,12 +306,61 @@ public interface KStream<K, V> {
      * @see #map(KeyValueMapper)
      * @see #flatMap(KeyValueMapper)
      * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
      * @see #transform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      */
     <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper);
 
     /**
+     * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
+     * with the same key in the new stream.
+     * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
+     * stream (value type can be altered arbitrarily).
+     * The provided {@link ValueMapperWithKey} is applied to each input record and computes zero or more output values.
+     * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+     * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
+     * for stateful value transformation).
+     * <p>
+     * The example below splits input records {@code <Integer:String>}, with key=1, containing sentences as values
+     * into their words.
+     * <pre>{@code
+     * KStream<Integer, String> inputStream = builder.stream("topic");
+     * KStream<Integer, String> outputStream = inputStream.flatMapValues(new ValueMapper<Integer, String, Iterable<String>> {
+     *     Iterable<Integer, String> apply(Integer readOnlyKey, String value) {
+     *         if(readOnlyKey == 1) {
+     *             return Arrays.asList(value.split(" "));
+     *         } else {
+     *             return Arrays.asList(value);
+     *         }
+     *     }
+     * });
+     * }</pre>
+     * <p>
+     * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
+     * and the return value must not be {@code null}.
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * So, splitting a record into multiple records with the same key preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+     *
+     * @param mapper a {@link ValueMapperWithKey} the computes the new output values
+     * @param <VR>      the value type of the result stream
+     * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type
+     * @see #selectKey(KeyValueMapper)
+     * @see #map(KeyValueMapper)
+     * @see #flatMap(KeyValueMapper)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     */
+    <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper);
+
+    /**
      * Print the records of this stream to {@code System.out}.
      * This function will use the generated name of the parent processor node to label the key/value pairs printed to
      * the console.
@@ -884,14 +979,12 @@ public interface KStream<K, V> {
      * In order to assign a state, the state must be created and registered beforehand:
      * <pre>{@code
      * // create store
-     * StateStoreSupplier myStore = Stores.create("myTransformState")
-     *     .withKeys(...)
-     *     .withValues(...)
-     *     .persistent() // optional
-     *     .build();
-     *
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
      * // register store
-     * builder.addStore(myStore);
+     * builder.addStateStore(keyValueStoreBuilder);
      *
      * KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
      * }</pre>
@@ -942,6 +1035,7 @@ public interface KStream<K, V> {
      * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type)
      * @see #flatMap(KeyValueMapper)
      * @see #transformValues(ValueTransformerSupplier, String...)
+     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      * @see #process(ProcessorSupplier, String...)
      */
     <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
@@ -959,14 +1053,12 @@ public interface KStream<K, V> {
      * In order to assign a state, the state must be created and registered beforehand:
      * <pre>{@code
      * // create store
-     * StateStoreSupplier myStore = Stores.create("myValueTransformState")
-     *     .withKeys(...)
-     *     .withValues(...)
-     *     .persistent() // optional
-     *     .build();
-     *
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
      * // register store
-     * builder.addStore(myStore);
+     * builder.addStateStore(keyValueStoreBuilder);
      *
      * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
      * }</pre>
@@ -1012,12 +1104,83 @@ public interface KStream<K, V> {
      * @param <VR>                     the value type of the result stream
      * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
      * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
      * @see #transform(TransformerSupplier, String...)
      */
     <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                         final String... stateStoreNames);
 
     /**
+     * Transform the value of each input record into a new value (with possible new type) of the output record.
+     * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applies to each input
+     * record value and computes a new value for it.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
+     * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
+     * periodic actions get be performed.
+     * <p>
+     * In order to assign a state, the state must be created and registered beforehand:
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+     * }</pre>
+     * <p>
+     * Within the {@link ValueTransformerWithKey}, the state is obtained via the
+     * {@link ProcessorContext}.
+     * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
+     * pairs should be emitted via {@link ProcessorContext#forward(Object, Object)
+     * ProcessorContext.forward()}.
+     * <pre>{@code
+     * new ValueTransformerWithKeySupplier() {
+     *     ValueTransformerWithKey get() {
+     *         return new ValueTransformerWithKey() {
+     *             private StateStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = context.getStateStore("myValueTransformState");
+     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
+     *             }
+     *
+     *             NewValueType transform(K readOnlyKey, V value) {
+     *                 // can access this.state and use read-only key
+     *                 return new NewValueType(readOnlyKey); // or null
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * So, setting a new value preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+     * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+     *
+     * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
+     *                                 {@link ValueTransformerWithKey}
+     * @param stateStoreNames          the names of the state stores used by the processor
+     * @param <VR>                     the value type of the result stream
+     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     * @see #transform(TransformerSupplier, String...)
+     */
+    <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
+                                        final String... stateStoreNames);
+
+    /**
      * Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
      * {@link ProcessorSupplier}).
      * This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
@@ -1028,14 +1191,12 @@ public interface KStream<K, V> {
      * In order to assign a state, the state must be created and registered beforehand:
      * <pre>{@code
      * // create store
-     * StateStoreSupplier myStore = Stores.create("myProcessorState")
-     *     .withKeys(...)
-     *     .withValues(...)
-     *     .persistent() // optional
-     *     .build();
-     *
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
      * // register store
-     * builder.addStore(myStore);
+     * builder.addStateStore(keyValueStoreBuilder);
      *
      * inputStream.process(new ProcessorSupplier() { ... }, "myProcessorState");
      * }</pre>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 33e56aa..3290150 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -345,7 +345,7 @@ public interface KTable<K, V> {
 
     /**
      * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
-     * (with possible new type)in the new {@code KTable}.
+     * (with possible new type) in the new {@code KTable}.
      * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
      * computes a new value for it, resulting in an update record for the result {@code KTable}.
      * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
@@ -379,7 +379,42 @@ public interface KTable<K, V> {
 
     /**
      * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
-     * (with possible new type)in the new {@code KTable}.
+     * (with possible new type) in the new {@code KTable}.
+     * For each {@code KTable} update the provided {@link ValueMapperWithKey} is applied to the value of the update
+     * record and computes a new value for it, resulting in an update record for the result {@code KTable}.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * The example below counts the number of token of value and key strings.
+     * <pre>{@code
+     * KTable<String, String> inputTable = builder.table("topic");
+     * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> {
+     *     Integer apply(String readOnlyKey, String value) {
+     *          return readOnlyKey.split(" ").length + value.split(" ").length;
+     *     }
+     * });
+     * }</pre>
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * This operation preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+     * the result {@code KTable}.
+     * <p>
+     * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapperWithKey)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+     * delete the corresponding record in the result {@code KTable}.
+     *
+     * @param mapper a {@link ValueMapperWithKey} that computes a new output value
+     * @param <VR>   the value type of the result {@code KTable}
+     * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+     */
+    <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper);
+
+    /**
+     * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+     * (with possible new type) in the new {@code KTable}.
      * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
      * computes a new value for it, resulting in an update record for the result {@code KTable}.
      * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
@@ -423,7 +458,52 @@ public interface KTable<K, V> {
 
     /**
      * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
-     * (with possible new type)in the new {@code KTable}.
+     * (with possible new type) in the new {@code KTable}.
+     * For each {@code KTable} update the provided {@link ValueMapperWithKey} is applied to the value of the update
+     * record and computes a new value for it, resulting in an update record for the result {@code KTable}.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * The example below counts the number of token of value and key strings.
+     * <pre>{@code
+     * KTable<String, String> inputTable = builder.table("topic");
+     * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> {
+     *     Integer apply(String readOnlyKey, String value) {
+     *          return readOnlyKey.split(" ").length + value.split(" ").length;
+     *     }
+     * });
+     * }</pre>
+     * <p>
+     * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+     * This operation preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+     * the result {@code KTable}.
+     * <p>
+     * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+     * delete the corresponding record in the result {@code KTable}.
+     *
+     * @param mapper a {@link ValueMapperWithKey} that computes a new output value
+     * @param materialized  a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+     *                      should be materialized. Cannot be {@code null}
+     * @param <VR>   the value type of the result {@code KTable}
+     *
+     * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+     */
+    <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
+                                 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+     * (with possible new type) in the new {@code KTable}.
      * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
      * computes a new value for it, resulting in an update record for the result {@code KTable}.
      * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
@@ -471,7 +551,7 @@ public interface KTable<K, V> {
 
     /**
      * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
-     * (with possible new type)in the new {@code KTable}.
+     * (with possible new type) in the new {@code KTable}.
      * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
      * computes a new value for it, resulting in an update record for the result {@code KTable}.
      * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index 51396b5..be550a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -29,9 +29,13 @@ package org.apache.kafka.streams.kstream;
  * @param <VR> mapped value type
  * @see KeyValueMapper
  * @see ValueTransformer
+ * @see ValueTransformerWithKey
  * @see KStream#mapValues(ValueMapper)
+ * @see KStream#mapValues(ValueMapperWithKey)
  * @see KStream#flatMapValues(ValueMapper)
+ * @see KStream#flatMapValues(ValueMapperWithKey)
  * @see KTable#mapValues(ValueMapper)
+ * @see KTable#mapValues(ValueMapperWithKey)
  */
 public interface ValueMapper<V, VR> {
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java
similarity index 57%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java
index 51396b5..b20c61a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapperWithKey.java
@@ -16,30 +16,37 @@
  */
 package org.apache.kafka.streams.kstream;
 
-
 /**
- * The {@code ValueMapper} interface for mapping a value to a new value of arbitrary type.
- * This is a stateless record-by-record operation, i.e, {@link #apply(Object)} is invoked individually for each record
- * of a stream (cf. {@link ValueTransformer} for stateful value transformation).
- * If {@code ValueMapper} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the record's
- * key is preserved.
+ * The {@code ValueMapperWithKey} interface for mapping a value to a new value of arbitrary type.
+ * This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is invoked individually for each
+ * record of a stream (cf. {@link ValueTransformer} for stateful value transformation).
+ * If {@code ValueMapperWithKey} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the
+ * record's key is preserved.
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
  * If a record's key and value should be modified {@link KeyValueMapper} can be used.
  *
+ * @param <K>  key type
  * @param <V>  value type
  * @param <VR> mapped value type
  * @see KeyValueMapper
  * @see ValueTransformer
+ * @see ValueTransformerWithKey
  * @see KStream#mapValues(ValueMapper)
+ * @see KStream#mapValues(ValueMapperWithKey)
  * @see KStream#flatMapValues(ValueMapper)
+ * @see KStream#flatMapValues(ValueMapperWithKey)
  * @see KTable#mapValues(ValueMapper)
+ * @see KTable#mapValues(ValueMapperWithKey)
  */
-public interface ValueMapper<V, VR> {
+
+public interface ValueMapperWithKey<K, V, VR> {
 
     /**
-     * Map the given value to a new value.
+     * Map the given [key and ]value to a new value.
      *
-     * @param value the value to be mapped
+     * @param readOnlyKey the read-only key
+     * @param value       the value to be mapped
      * @return the new value
      */
-    VR apply(final V value);
-}
+    VR apply(final K readOnlyKey, final V value);
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 0a8e890..1802a61 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -40,7 +40,9 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
  * @param <V>  value type
  * @param <VR> transformed value type
  * @see ValueTransformerSupplier
+ * @see ValueTransformerWithKeySupplier
  * @see KStream#transformValues(ValueTransformerSupplier, String...)
+ * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
  * @see Transformer
  */
 public interface ValueTransformer<V, VR> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
index 78234ae..10cf543 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -23,7 +23,10 @@ package org.apache.kafka.streams.kstream;
  * @param <V>  value type
  * @param <VR> transformed value type
  * @see ValueTransformer
+ * @see ValueTransformerWithKey
+ * @see ValueTransformerWithKeySupplier
  * @see KStream#transformValues(ValueTransformerSupplier, String...)
+ * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
  * @see Transformer
  * @see TransformerSupplier
  * @see KStream#transform(TransformerSupplier, String...)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
similarity index 61%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
index 0a8e890..128c61f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
@@ -22,28 +22,34 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TimestampExtractor;
 
 /**
- * The {@code ValueTransformer} interface for stateful mapping of a value to a new value (with possible new type).
- * This is a stateful record-by-record operation, i.e, {@link #transform(Object)} is invoked individually for each
+ * The {@code ValueTransformerWithKey} interface for stateful mapping of a value to a new value (with possible new type).
+ * This is a stateful record-by-record operation, i.e, {@link #transform(Object, Object)} is invoked individually for each
  * record of a stream and can access and modify a state that is available beyond a single call of
- * {@link #transform(Object)} (cf. {@link ValueMapper} for stateless value transformation).
- * Additionally, this {@code ValueTransformer} can {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule}
- * a method to be {@link Punctuator#punctuate(long) called periodically} with the provided context.
- * If {@code ValueTransformer} is applied to a {@link KeyValue} pair record the record's key is preserved.
+ * {@link #transform(Object, Object)} (cf. {@link ValueMapper} for stateless value transformation).
+ * Additionally, this {@code ValueTransformerWithKey} can
+ * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method to be
+ * {@link Punctuator#punctuate(long) called periodically} with the provided context.
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * If {@code ValueTransformerWithKey} is applied to a {@link KeyValue} pair record the record's key is preserved.
  * <p>
- * Use {@link ValueTransformerSupplier} to provide new instances of {@code ValueTransformer} to Kafka Stream's runtime.
+ * Use {@link ValueTransformerWithKeySupplier} to provide new instances of {@code {@link ValueTransformerWithKey} to
+ * Kafka Stream's runtime.
  * <p>
  * If a record's key and value should be modified {@link Transformer} can be used.
  *
+ * @param <K>  key type
  * @param <V>  value type
  * @param <VR> transformed value type
- * @see ValueTransformerSupplier
+ * @see ValueTransformer
+ * @see ValueTransformerWithKeySupplier
  * @see KStream#transformValues(ValueTransformerSupplier, String...)
+ * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
  * @see Transformer
  */
-public interface ValueTransformer<V, VR> {
+
+public interface ValueTransformerWithKey<K, V, VR> {
 
     /**
      * Initialize this transformer.
@@ -54,12 +60,12 @@ public interface ValueTransformer<V, VR> {
      * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s.
      * <p>
      * Note that {@link ProcessorContext} is updated in the background with the current record's meta data.
-     * Thus, it only contains valid record meta data when accessed within {@link #transform(Object)}.
+     * Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}.
      * <p>
      * Note that using {@link ProcessorContext#forward(Object, Object)},
      * {@link ProcessorContext#forward(Object, Object, int)}, or
      * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of
-     * {@code ValueTransformer} and will result in an {@link StreamsException exception}.
+     * {@code ValueTransformerWithKey} and will result in an {@link StreamsException exception}.
      *
      * @param context the context
      * @throws IllegalStateException If store gets registered after initialization is already finished
@@ -68,8 +74,8 @@ public interface ValueTransformer<V, VR> {
     void init(final ProcessorContext context);
 
     /**
-     * Transform the given value to a new value.
-     * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerSupplier, String...)
+     * Transform the given [key and ]value to a new value.
+     * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
@@ -78,32 +84,11 @@ public interface ValueTransformer<V, VR> {
      * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
      *
-     * @param value the value to be transformed
+     * @param readOnlyKey the read-only key
+     * @param value       the value to be transformed
      * @return the new value
      */
-    VR transform(final V value);
-
-    /**
-     * Perform any periodic operations if this processor {@link ProcessorContext#schedule(long) schedule itself} with
-     * the context during {@link #init(ProcessorContext) initialization}.
-     * <p>
-     * It is not possible to return any new output records within {@code punctuate}.
-     * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
-     * or {@link ProcessorContext#forward(Object, Object, String)} will result in an
-     * {@link StreamsException exception}.
-     * Furthermore, {@code punctuate} must return {@code null}.
-     * <p>
-     * Note, that {@code punctuate} is called base on <it>stream time</it> (i.e., time progress with regard to
-     * timestamps return by the used {@link TimestampExtractor})
-     * and not based on wall-clock time.
-     *
-     * @deprecated Please use {@link Punctuator} functional interface instead.
-     *
-     * @param timestamp the stream time when {@code punctuate} is being called
-     * @return must return {@code null}&mdash;otherwise, an {@link StreamsException exception} will be thrown
-     */
-    @Deprecated
-    VR punctuate(final long timestamp);
+    VR transform(final K readOnlyKey, final V value);
 
     /**
      * Close this processor and clean up any resources.
@@ -113,5 +98,4 @@ public interface ValueTransformer<V, VR> {
      * or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}.
      */
     void close();
-
-}
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
similarity index 76%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
index 78234ae..766c84e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
@@ -16,24 +16,18 @@
  */
 package org.apache.kafka.streams.kstream;
 
-
 /**
- * A {@code ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
- *
+ * @param <K>  key type
  * @param <V>  value type
  * @param <VR> transformed value type
  * @see ValueTransformer
+ * @see ValueTransformerWithKey
  * @see KStream#transformValues(ValueTransformerSupplier, String...)
+ * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
  * @see Transformer
  * @see TransformerSupplier
  * @see KStream#transform(TransformerSupplier, String...)
  */
-public interface ValueTransformerSupplier<V, VR> {
-
-    /**
-     * Return a new {@link ValueTransformer} instance.
-     *
-     * @return a new {@link ValueTransformer} instance.
-     */
-    ValueTransformer<V, VR> get();
+public interface ValueTransformerWithKeySupplier<K, V, VR> {
+    ValueTransformerWithKey<K, V, VR> get();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 26e404e..51fe820 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -18,9 +18,17 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
@@ -111,5 +119,75 @@ public abstract class AbstractStream<K> {
                 .enableCaching();
     }
 
+    static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR> valueMapper) {
+        Objects.requireNonNull(valueMapper, "valueMapper can't be null");
+        return new ValueMapperWithKey<K, V, VR>() {
+            @Override
+            public VR apply(final K readOnlyKey, final V value) {
+                return valueMapper.apply(value);
+            }
+        };
+    }
 
+    static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR> toInternalValueTransformerSupplier(final ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+        final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
+        return new InternalValueTransformerWithKeySupplier<K, V, VR>() {
+            @Override
+            public InternalValueTransformerWithKey<K, V, VR> get() {
+                return new InternalValueTransformerWithKey<K, V, VR>() {
+                    @Override
+                    public VR punctuate(final long timestamp) {
+                        return valueTransformer.punctuate(timestamp);
+                    }
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        valueTransformer.init(context);
+                    }
+
+                    @Override
+                    public VR transform(final K readOnlyKey, final V value) {
+                        return valueTransformer.transform(value);
+                    }
+
+                    @Override
+                    public void close() {
+                        valueTransformer.close();
+                    }
+                };
+            }
+        };
+    }
+
+    static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR> toInternalValueTransformerSupplier(final ValueTransformerWithKeySupplier<K, V, VR> valueTransformerWithKeySupplier) {
+        Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier can't be null");
+        final ValueTransformerWithKey<K, V, VR> valueTransformerWithKey = valueTransformerWithKeySupplier.get();
+        return new InternalValueTransformerWithKeySupplier<K, V, VR>() {
+            @Override
+            public InternalValueTransformerWithKey<K, V, VR> get() {
+                return new InternalValueTransformerWithKey<K, V, VR>() {
+                    @Override
+                    public VR punctuate(final long timestamp) {
+                        throw new StreamsException("ValueTransformerWithKey#punctuate should not be called.");
+                    }
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        valueTransformerWithKey.init(context);
+                    }
+
+                    @Override
+                    public VR transform(final K readOnlyKey, final V value) {
+                        return valueTransformerWithKey.transform(readOnlyKey, value);
+                    }
+
+                    @Override
+                    public void close() {
+                        valueTransformerWithKey.close();
+                    }
+                };
+            }
+        };
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
similarity index 55%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
index 78234ae..636e409 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
@@ -14,26 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream;
+package org.apache.kafka.streams.kstream.internals;
 
 
-/**
- * A {@code ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
- *
- * @param <V>  value type
- * @param <VR> transformed value type
- * @see ValueTransformer
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see Transformer
- * @see TransformerSupplier
- * @see KStream#transform(TransformerSupplier, String...)
- */
-public interface ValueTransformerSupplier<V, VR> {
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 
-    /**
-     * Return a new {@link ValueTransformer} instance.
-     *
-     * @return a new {@link ValueTransformer} instance.
-     */
-    ValueTransformer<V, VR> get();
+public interface InternalValueTransformerWithKey<K, V, VR> extends ValueTransformerWithKey<K, V, VR> {
+    VR punctuate(final long timestamp);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
similarity index 55%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
index 78234ae..3418e71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
@@ -14,26 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream;
+package org.apache.kafka.streams.kstream.internals;
 
-
-/**
- * A {@code ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
- *
- * @param <V>  value type
- * @param <VR> transformed value type
- * @see ValueTransformer
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see Transformer
- * @see TransformerSupplier
- * @see KStream#transform(TransformerSupplier, String...)
- */
-public interface ValueTransformerSupplier<V, VR> {
-
-    /**
-     * Return a new {@link ValueTransformer} instance.
-     *
-     * @return a new {@link ValueTransformer} instance.
-     */
-    ValueTransformer<V, VR> get();
-}
+public interface InternalValueTransformerWithKeySupplier<K, V, VR> {
+    InternalValueTransformerWithKey<K, V, VR> get();
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index ab9d227..7d0d270 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -16,16 +16,16 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
 
-    private final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper;
+    private final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends V1>> mapper;
 
-    KStreamFlatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
+    KStreamFlatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends V1>> mapper) {
         this.mapper = mapper;
     }
 
@@ -37,7 +37,7 @@ class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
     private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K, V> {
         @Override
         public void process(K key, V value) {
-            Iterable<? extends V1> newValues = mapper.apply(value);
+            final Iterable<? extends V1> newValues = mapper.apply(key, value);
             for (V1 v : newValues) {
                 context().forward(key, v);
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 8e80315..db4d238 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -35,7 +35,9 @@ import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -176,6 +178,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
+        return mapValues(withKey(mapper));
+    }
+
+    @Override
+    public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         String name = builder.newProcessorName(MAPVALUES_NAME);
 
@@ -329,6 +336,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <V1> KStream<K, V1> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
+        return flatMapValues(withKey(mapper));
+    }
+
+    @Override
+    public <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         String name = builder.newProcessorName(FLATMAPVALUES_NAME);
 
@@ -514,9 +526,22 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public <V1> KStream<K, V1> transformValues(final ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier,
                                                final String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
-        String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
 
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
+        return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
+    }
+
+    @Override
+    public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
+                                               final String... stateStoreNames) {
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
+
+        return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
+    }
+
+    private <VR> KStream<K, VR> transformValues(final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> internalValueTransformerWithKeySupplier,
+                                                final String... stateStoreNames) {
+        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(internalValueTransformerWithKeySupplier), this.name);
         if (stateStoreNames != null && stateStoreNames.length > 0) {
             builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index 8897a6c..28c120e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -16,16 +16,16 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
 
-    private final ValueMapper<V, V1> mapper;
+    private final ValueMapperWithKey<K, V, V1> mapper;
 
-    public KStreamMapValues(ValueMapper<V, V1> mapper) {
+    public KStreamMapValues(final ValueMapperWithKey<K, V, V1> mapper) {
         this.mapper = mapper;
     }
 
@@ -36,9 +36,9 @@ class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
 
     private class KStreamMapProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(final K key, final V value) {
-            V1 newValue = mapper.apply(value);
-            context().forward(key, newValue);
+        public void process(final K readOnlyKey, final V value) {
+            final V1 newValue = mapper.apply(readOnlyKey, value);
+            context().forward(readOnlyKey, newValue);
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 55c16cc..ace4f69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -19,8 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.kstream.ValueTransformer;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -36,9 +34,9 @@ import java.util.Map;
 
 public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> {
 
-    private final ValueTransformerSupplier<V, R> valueTransformerSupplier;
+    private final InternalValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier;
 
-    public KStreamTransformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier) {
+    public KStreamTransformValues(final InternalValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier) {
         this.valueTransformerSupplier = valueTransformerSupplier;
     }
 
@@ -49,10 +47,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
 
     public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
 
-        private final ValueTransformer<V, R> valueTransformer;
+        private final InternalValueTransformerWithKey<K, V, R> valueTransformer;
         private ProcessorContext context;
 
-        public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) {
+        public KStreamTransformValuesProcessor(final InternalValueTransformerWithKey<K, V, R> valueTransformer) {
             this.valueTransformer = valueTransformer;
         }
 
@@ -168,7 +166,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
 
         @Override
         public void process(K key, V value) {
-            context.forward(key, valueTransformer.transform(value));
+            context.forward(key, valueTransformer.transform(key, value));
         }
 
         @SuppressWarnings("deprecation")
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 3bc6f4b..4019039 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -250,7 +251,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @SuppressWarnings("deprecation")
-    private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
+    private <V1> KTable<K, V1> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper,
                                            final Serde<V1> valueSerde,
                                            final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(mapper);
@@ -271,24 +272,37 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
-        return mapValues(mapper, null, (String) null);
+        return doMapValues(withKey(mapper), null, null);
+    }
+
+    @Override
+    public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
+        return doMapValues(mapper, null, null);
+
     }
 
     @Override
     public <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
                                         final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+        return mapValues(withKey(mapper), materialized);
+    }
+
+    @Override
+    public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
+                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
                 = new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME);
         final String name = builder.newProcessorName(MAPVALUES_NAME);
-        final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this,
-                                                                                          mapper,
-                                                                                          materializedInternal.storeName());
+        final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(
+                this,
+                mapper,
+                materializedInternal.storeName());
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
-        builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal)
-                                                              .materialize(),
-                                                      name);
+        builder.internalTopologyBuilder.addStateStore(
+                new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+                name);
         return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
     }
 
@@ -297,11 +311,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                         final Serde<V1> valueSerde,
                                         final String queryableStoreName) {
-        org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
-        if (queryableStoreName != null) {
-            storeSupplier = keyValueStore(this.keySerde, valueSerde, queryableStoreName);
-        }
-        return doMapValues(mapper, valueSerde, storeSupplier);
+        return mapValues(withKey(mapper), Materialized.<K, V1, KeyValueStore<Bytes, byte[]>>as(queryableStoreName).
+                withValueSerde(valueSerde).withKeySerde(this.keySerde));
     }
 
     @SuppressWarnings("deprecation")
@@ -310,7 +321,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                          final Serde<V1> valueSerde,
                                          final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        return doMapValues(mapper, valueSerde, storeSupplier);
+        return doMapValues(withKey(mapper), valueSerde, storeSupplier);
     }
 
     @SuppressWarnings("deprecation")
@@ -545,9 +556,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     public KStream<K, V> toStream() {
         String name = builder.newProcessorName(TOSTREAM_NAME);
 
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<K, Change<V>, V>(new ValueMapper<Change<V>, V>() {
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(new ValueMapperWithKey<K, Change<V>, V>() {
             @Override
-            public V apply(Change<V> change) {
+            public V apply(final K key, final Change<V> change) {
                 return change.newValue;
             }
         }), this.name);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 41dd7cd..03fa3a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -26,11 +26,11 @@ import org.apache.kafka.streams.state.KeyValueStore;
 class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
 
     private final KTableImpl<K, ?, V> parent;
-    private final ValueMapper<? super V, ? extends V1> mapper;
+    private final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper;
     private final String queryableName;
     private boolean sendOldValues = false;
 
-    public KTableMapValues(final KTableImpl<K, ?, V> parent, final ValueMapper<? super V, ? extends V1> mapper,
+    public KTableMapValues(final KTableImpl<K, ?, V> parent, final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper,
                            final String queryableName) {
         this.parent = parent;
         this.mapper = mapper;
@@ -65,11 +65,11 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
         sendOldValues = true;
     }
 
-    private V1 computeValue(V value) {
+    private V1 computeValue(final K key, final V value) {
         V1 newValue = null;
 
         if (value != null)
-            newValue = mapper.apply(value);
+            newValue = mapper.apply(key, value);
 
         return newValue;
     }
@@ -91,8 +91,8 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
 
         @Override
         public void process(K key, Change<V> change) {
-            V1 newValue = computeValue(change.newValue);
-            V1 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
+            final V1 newValue = computeValue(key, change.newValue);
+            final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
 
             if (queryableName != null) {
                 store.put(key, newValue);
@@ -118,7 +118,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
 
         @Override
         public V1 get(K key) {
-            return computeValue(parentGetter.get(key));
+            return computeValue(key, parentGetter.get(key));
         }
 
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 59ab0ff..ecfa3aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Rule;
@@ -28,7 +29,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
 
 public class KStreamFlatMapValuesTest {
 
@@ -54,24 +55,50 @@ public class KStreamFlatMapValuesTest {
 
         final int[] expectedKeys = {0, 1, 2, 3};
 
-        KStream<Integer, Integer> stream;
-        MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
-        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+        final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+        final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
         stream.flatMapValues(mapper).process(processor);
 
         driver.setUp(builder);
-        for (int expectedKey : expectedKeys) {
+        for (final int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, expectedKey);
         }
 
-        assertEquals(8, processor.processed.size());
-
         String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
 
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
+        assertArrayEquals(expected, processor.processed.toArray());
+    }
+
+
+    @Test
+    public void testFlatMapValuesWithKeys() {
+        StreamsBuilder builder = new StreamsBuilder();
+
+        ValueMapperWithKey<Integer, Number, Iterable<String>> mapper =
+                new ValueMapperWithKey<Integer, Number, Iterable<String>>() {
+            @Override
+            public Iterable<String> apply(final Integer readOnlyKey, final Number value) {
+                ArrayList<String> result = new ArrayList<>();
+                result.add("v" + value);
+                result.add("k" + readOnlyKey);
+                return result;
+            }
+        };
+
+        final int[] expectedKeys = {0, 1, 2, 3};
+
+        final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+        final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
+
+        stream.flatMapValues(mapper).process(processor);
+
+        driver.setUp(builder);
+        for (final int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, expectedKey);
         }
+
+        String[] expected = {"0:v0", "0:k0", "1:v1", "1:k1", "2:v2", "2:k2", "3:v3", "3:k3"};
+
+        assertArrayEquals(expected, processor.processed.toArray());
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 562711d..2009806 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -36,6 +36,9 @@ import org.apache.kafka.streams.kstream.Printed;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
@@ -283,7 +286,12 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnMapValues() {
-        testStream.mapValues(null);
+        testStream.mapValues((ValueMapper) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnMapValuesWithKey() {
+        testStream.mapValues((ValueMapperWithKey) null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -303,7 +311,12 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnFlatMapValues() {
-        testStream.flatMapValues(null);
+        testStream.flatMapValues((ValueMapper) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() {
+        testStream.flatMapValues((ValueMapperWithKey) null);
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -333,7 +346,12 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTransformSupplierOnTransformValues() {
-        testStream.transformValues(null);
+        testStream.transformValues((ValueTransformerSupplier) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTransformSupplierOnTransformValuesWithKey() {
+        testStream.transformValues((ValueTransformerWithKeySupplier) null);
     }
 
     @Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index e4bf23e..2cedfb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -22,12 +22,13 @@ import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Rule;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
 
 public class KStreamMapValuesTest {
 
@@ -61,13 +62,37 @@ public class KStreamMapValuesTest {
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, Integer.toString(expectedKey));
         }
+        String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
 
-        assertEquals(4, processor.processed.size());
+        assertArrayEquals(expected, processor.processed.toArray());
+    }
 
-        String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
+    @Test
+    public void testMapValuesWithKeys() {
+        StreamsBuilder builder = new StreamsBuilder();
+
+        ValueMapperWithKey<Integer, CharSequence, Integer> mapper =
+                new ValueMapperWithKey<Integer, CharSequence, Integer>() {
+            @Override
+            public Integer apply(final Integer readOnlyKey, final CharSequence value) {
+                return value.length() + readOnlyKey;
+            }
+        };
+
+        final int[] expectedKeys = {1, 10, 100, 1000};
+
+        KStream<Integer, String> stream;
+        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
+        stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+        stream.mapValues(mapper).process(processor);
 
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
+        driver.setUp(builder);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, Integer.toString(expectedKey));
         }
+        String[] expected = {"1:2", "10:12", "100:103", "1000:1004"};
+
+        assertArrayEquals(expected, processor.processed.toArray());
     }
+
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 16f121e..1b34fab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -23,7 +23,9 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -31,8 +33,8 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Rule;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
 
 public class KStreamTransformValuesTest {
 
@@ -86,22 +88,82 @@ public class KStreamTransformValuesTest {
         for (int expectedKey : expectedKeys) {
             driver.process(topicName, expectedKey, expectedKey * 10);
         }
+        String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
 
-        assertEquals(4, processor.processed.size());
+        assertArrayEquals(expected, processor.processed.toArray());
+    }
 
-        String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
+    @Test
+    public void testTransformWithKey() {
+        StreamsBuilder builder = new StreamsBuilder();
+
+        ValueTransformerWithKeySupplier<Integer, Number, Integer> valueTransformerSupplier =
+                new ValueTransformerWithKeySupplier<Integer, Number, Integer>() {
+            public ValueTransformerWithKey<Integer, Number, Integer> get() {
+                return new ValueTransformerWithKey<Integer, Number, Integer>() {
+                    private int total = 0;
+                    @Override
+                    public void init(final ProcessorContext context) {
+
+                    }
+                    @Override
+                    public Integer transform(final Integer readOnlyKey, final Number value) {
+                        total += value.intValue() + readOnlyKey;
+                        return total;
+                    }
+
+                    @Override
+                    public void close() {
+
+                    }
+                };
+            }
+        };
+
+        final int[] expectedKeys = {1, 10, 100, 1000};
+
+        KStream<Integer, Integer> stream;
+        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
+        stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+        stream.transformValues(valueTransformerSupplier).process(processor);
 
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
+        driver.setUp(builder);
+        for (int expectedKey : expectedKeys) {
+            driver.process(topicName, expectedKey, expectedKey * 10);
         }
+        String[] expected = {"1:11", "10:121", "100:1221", "1000:12221"};
+
+        assertArrayEquals(expected, processor.processed.toArray());
     }
 
+
     @Test
     public void shouldNotAllowValueTransformerToCallInternalProcessorContextMethods() {
-        final KStreamTransformValues<Integer, Integer, Integer> transformValue = new KStreamTransformValues<>(new ValueTransformerSupplier<Integer, Integer>() {
+        final BadValueTransformer badValueTransformer = new BadValueTransformer();
+        final KStreamTransformValues<Integer, Integer, Integer> transformValue = new KStreamTransformValues<>(new InternalValueTransformerWithKeySupplier<Integer, Integer, Integer>() {
             @Override
-            public ValueTransformer<Integer, Integer> get() {
-                return new BadValueTransformer();
+            public InternalValueTransformerWithKey<Integer, Integer, Integer> get() {
+                return new InternalValueTransformerWithKey<Integer, Integer, Integer>() {
+                    @Override
+                    public Integer punctuate(long timestamp) {
+                        throw new StreamsException("ValueTransformerWithKey#punctuate should not be called.");
+                    }
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        badValueTransformer.init(context);
+                    }
+
+                    @Override
+                    public Integer transform(final Integer readOnlyKey, final Integer value) {
+                        return badValueTransformer.transform(readOnlyKey, value);
+                    }
+
+                    @Override
+                    public void close() {
+                        badValueTransformer.close();
+                    }
+                };
             }
         });
 
@@ -137,16 +199,16 @@ public class KStreamTransformValuesTest {
         }
     }
 
-    private static final class BadValueTransformer implements ValueTransformer<Integer, Integer> {
+    private static final class BadValueTransformer implements ValueTransformerWithKey<Integer, Integer, Integer> {
         private ProcessorContext context;
 
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             this.context = context;
         }
 
         @Override
-        public Integer transform(Integer value) {
+        public Integer transform(final Integer key, final Integer value) {
             if (value == 0) {
                 context.forward(null, null);
             }
@@ -160,11 +222,6 @@ public class KStreamTransformValuesTest {
         }
 
         @Override
-        public Integer punctuate(long timestamp) {
-            return 1; // any not-null falue
-        }
-
-        @Override
         public void close() { }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 9539b45..a7aed2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
@@ -393,7 +394,12 @@ public class KTableImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnMapValues() {
-        table.mapValues(null);
+        table.mapValues((ValueMapper) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnMapValueWithKey() {
+        table.mapValues((ValueMapperWithKey) null);
     }
 
     @SuppressWarnings("deprecation")

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].