You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/10 01:24:17 UTC

kafka git commit: MINOR: putting back kstream stateful transform methods

Repository: kafka
Updated Branches:
  refs/heads/trunk 96209b1e7 -> c67ca6588


MINOR: putting back kstream stateful transform methods

guozhangwang

* added back type safe stateful transform methods (kstream.transform() and kstream.transformValues())
* changed kstream.process() to void

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #292 from ymatsuda/transform_method


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c67ca658
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c67ca658
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c67ca658

Branch: refs/heads/trunk
Commit: c67ca65889721e3af7527ab26df49a9fb4db87ef
Parents: 96209b1
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Oct 9 16:28:40 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 9 16:28:40 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   | 21 ++++-
 .../kafka/streams/kstream/Transformer.java      | 57 ++++++++++++
 .../streams/kstream/TransformerSupplier.java    | 24 +++++
 .../kafka/streams/kstream/ValueTransformer.java | 56 ++++++++++++
 .../kstream/ValueTransformerSupplier.java       | 24 +++++
 .../streams/kstream/internals/KStreamImpl.java  | 30 ++++++-
 .../kstream/internals/KStreamTransform.java     | 71 +++++++++++++++
 .../internals/KStreamTransformValues.java       | 69 +++++++++++++++
 .../processor/internals/StreamThread.java       | 10 ++-
 .../kstream/internals/KStreamImplTest.java      |  5 +-
 .../kstream/internals/KStreamTransformTest.java | 93 ++++++++++++++++++++
 .../internals/KStreamTransformValuesTest.java   | 92 +++++++++++++++++++
 12 files changed, 539 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index ecec882..915cf1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 /**
  * KStream is an abstraction of a stream of key-value pairs.
- * 
+ *
  * @param <K> the type of keys
  * @param <V> the type of values
  */
@@ -151,10 +151,27 @@ public interface KStream<K, V> {
     void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
 
     /**
+     * Applies a stateful transformation to all elements in this stream.
+     *
+     * @param transformerSupplier the class of TransformerDef
+     * @return KStream
+     */
+    <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier);
+
+    /**
+     * Applies a stateful transformation to all values in this stream.
+     *
+     * @param valueTransformerSupplier the class of TransformerDef
+     * @return KStream
+     */
+    <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier);
+
+    /**
      * Processes all elements in this stream by applying a processor.
      *
      * @param processorSupplier the supplier of the Processor to use
      * @return the new stream containing the processed output
      */
-    <K1, V1> KStream<K1, V1> process(ProcessorSupplier<K, V> processorSupplier);
+    void process(ProcessorSupplier<K, V> processorSupplier);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
new file mode 100644
index 0000000..b67f619
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public interface Transformer<K, V, R> {
+
+    /**
+     * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
+     * that contains it is initialized.
+     * <p>
+     * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+     * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
+     *
+     * @param context the context; may not be null
+     */
+    void init(ProcessorContext context);
+
+    /**
+     * Transform the message with the given key and value.
+     *
+     * @param key the key for the message
+     * @param value the value for the message
+     * @return new value
+     */
+    R transform(K key, V value);
+
+    /**
+     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+     * during {@link #init(ProcessorContext) initialization}.
+     *
+     * @param timestamp the stream time when this method is being called
+     */
+    void punctuate(long timestamp);
+
+    /**
+     * Close this processor and clean up any resources.
+     */
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
new file mode 100644
index 0000000..2c2d8dd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface TransformerSupplier<K, V, R> {
+
+    Transformer<K, V, R> get();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
new file mode 100644
index 0000000..5b9e2ff
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public interface ValueTransformer<V, R> {
+
+    /**
+     * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
+     * that contains it is initialized.
+     * <p>
+     * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+     * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
+     *
+     * @param context the context; may not be null
+     */
+    void init(ProcessorContext context);
+
+    /**
+     * Transform the message with the given key and value.
+     *
+     * @param value the value for the message
+     * @return new value
+     */
+    R transform(V value);
+
+    /**
+     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+     * during {@link #init(ProcessorContext) initialization}.
+     *
+     * @param timestamp the stream time when this method is being called
+     */
+    void punctuate(long timestamp);
+
+    /**
+     * Close this processor and clean up any resources.
+     */
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
new file mode 100644
index 0000000..5c053c7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface ValueTransformerSupplier<V, R> {
+
+    ValueTransformer<V, R> get();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index cff97d6..8f56e09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -19,9 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -44,6 +46,10 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
 
     private static final String FLATMAPVALUES_NAME = "KAFKA-FLATMAPVALUES-";
 
+    private static final String TRANSFORM_NAME = "KAFKA-TRANSFORM-";
+
+    private static final String TRANSFORMVALUES_NAME = "KAFKA-TRANSFORMVALUES-";
+
     private static final String PROCESSOR_NAME = "KAFKA-PROCESSOR-";
 
     private static final String BRANCH_NAME = "KAFKA-BRANCH-";
@@ -191,11 +197,27 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> process(final ProcessorSupplier<K, V> processorSupplier) {
-        String name = PROCESSOR_NAME + INDEX.getAndIncrement();
+    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) {
+        String name = TRANSFORM_NAME + INDEX.getAndIncrement();
 
-        topology.addProcessor(name, processorSupplier, this.name);
+        topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
+
+        return new KStreamImpl<>(topology, name);
+    }
+
+    @Override
+    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier) {
+        String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
 
         return new KStreamImpl<>(topology, name);
     }
+
+    @Override
+    public void process(final ProcessorSupplier<K, V> processorSupplier) {
+        String name = PROCESSOR_NAME + INDEX.getAndIncrement();
+
+        topology.addProcessor(name, processorSupplier, this.name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
new file mode 100644
index 0000000..7ebab0e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public class KStreamTransform<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+
+    private final TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier;
+
+    public KStreamTransform(TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier) {
+        this.transformerSupplier = transformerSupplier;
+    }
+
+    @Override
+    public Processor<K1, V1> get() {
+        return new KStreamTransformProcessor(transformerSupplier.get());
+    }
+
+    public static class KStreamTransformProcessor<K1, V1, K2, V2> implements Processor<K1, V1> {
+
+        private final Transformer<K1, V1, KeyValue<K2, V2>> transformer;
+        private ProcessorContext context;
+
+        public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>> transformer) {
+            this.transformer = transformer;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            transformer.init(context);
+            this.context = context;
+        }
+
+        @Override
+        public void process(K1 key, V1 value) {
+            KeyValue<K2, V2> pair = transformer.transform(key, value);
+            context.forward(pair.key, pair.value);
+        }
+
+        @Override
+        public void punctuate(long timestamp) {
+            transformer.punctuate(timestamp);
+        }
+
+        @Override
+        public void close() {
+            transformer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
new file mode 100644
index 0000000..6f989e6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> {
+
+    private final ValueTransformerSupplier<V, R> valueTransformerSupplier;
+
+    public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) {
+        this.valueTransformerSupplier = valueTransformerSupplier;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamTransformValuesProcessor(valueTransformerSupplier.get());
+    }
+
+    public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
+
+        private final ValueTransformer valueTransformer;
+        private ProcessorContext context;
+
+        public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) {
+            this.valueTransformer = valueTransformer;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            valueTransformer.init(context);
+            this.context = context;
+        }
+
+        @Override
+        public void process(K key, V value) {
+            context.forward(key, valueTransformer.transform(value));
+        }
+
+        @Override
+        public void punctuate(long timestamp) {
+            valueTransformer.punctuate(timestamp);
+        }
+
+        @Override
+        public void close() {
+            valueTransformer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 95a923d..4a68332 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -159,7 +159,7 @@ public class StreamThread extends Thread {
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
     }
-    
+
     private Consumer<byte[], byte[]> createRestoreConsumer() {
         log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
         return new KafkaConsumer<>(config.getConsumerConfigs(),
@@ -240,9 +240,11 @@ public class StreamThread extends Thread {
                 // try to fetch some records if necessary
                 ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
 
-                for (StreamTask task : tasks.values()) {
-                    for (TopicPartition partition : task.partitions()) {
-                        task.addRecords(partition, records.records(partition));
+                if (!records.isEmpty()) {
+                    for (StreamTask task : tasks.values()) {
+                        for (TopicPartition partition : task.partitions()) {
+                            task.addRecords(partition, records.records(partition));
+                        }
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 875712a..2db488c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -119,7 +119,7 @@ public class KStreamImplTest {
 
         stream4.to("topic-5");
 
-        stream5.through("topic-6").process(new MockProcessorSupplier<>()).to("topic-7");
+        stream5.through("topic-6").process(new MockProcessorSupplier<>());
 
         assertEquals(2 + // sources
             2 + // stream1
@@ -131,8 +131,7 @@ public class KStreamImplTest {
             2 + 3 + // stream5
             1 + // to
             2 + // through
-            1 + // process
-            1, // to
+            1, // process
             builder.build().processors().size());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
new file mode 100644
index 0000000..e397dd1
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamTransformTest {
+
+    private String topicName = "topic";
+
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private IntegerDeserializer valDeserializer = new IntegerDeserializer();
+
+    @Test
+    public void testTransform() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>> transformerSupplier =
+            new TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() {
+                public Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() {
+                    return new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
+
+                        private int total = 0;
+
+                        @Override
+                        public void init(ProcessorContext context) {
+                        }
+
+                        @Override
+                        public KeyValue<Integer, Integer> transform(Integer key, Integer value) {
+                            total += value;
+                            return KeyValue.pair(key * 2, total);
+                        }
+
+                        @Override
+                        public void punctuate(long timestamp) {
+                        }
+
+                        @Override
+                        public void close() {
+                        }
+                    };
+                }
+            };
+
+        final int[] expectedKeys = {1, 10, 100, 1000};
+
+        KStream<Integer, Integer> stream;
+        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
+        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream.transform(transformerSupplier).process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
+        }
+
+        assertEquals(4, processor.processed.size());
+
+        String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"};
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
new file mode 100644
index 0000000..c5c9b39
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamTransformValuesTest {
+
+    private String topicName = "topic";
+
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private IntegerDeserializer valDeserializer = new IntegerDeserializer();
+
+    @Test
+    public void testTransform() {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        ValueTransformerSupplier<Integer, Integer> valueTransformerSupplier =
+            new ValueTransformerSupplier<Integer, Integer>() {
+                public ValueTransformer<Integer, Integer> get() {
+                    return new ValueTransformer<Integer, Integer>() {
+
+                        private int total = 0;
+
+                        @Override
+                        public void init(ProcessorContext context) {
+                        }
+
+                        @Override
+                        public Integer transform(Integer value) {
+                            total += value;
+                            return total;
+                        }
+
+                        @Override
+                        public void punctuate(long timestamp) {
+                        }
+
+                        @Override
+                        public void close() {
+                        }
+                    };
+                }
+            };
+
+        final int[] expectedKeys = {1, 10, 100, 1000};
+
+        KStream<Integer, Integer> stream;
+        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
+        stream = builder.from(keyDeserializer, valDeserializer, topicName);
+        stream.transformValues(valueTransformerSupplier).process(processor);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
+        }
+
+        assertEquals(4, processor.processed.size());
+
+        String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+    }
+
+}