You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/10 01:24:17 UTC
kafka git commit: MINOR: putting back kstream stateful transform
methods
Repository: kafka
Updated Branches:
refs/heads/trunk 96209b1e7 -> c67ca6588
MINOR: putting back kstream stateful transform methods
guozhangwang
* added back type safe stateful transform methods (kstream.transform() and kstream.transformValues())
* changed kstream.process() to void
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #292 from ymatsuda/transform_method
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c67ca658
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c67ca658
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c67ca658
Branch: refs/heads/trunk
Commit: c67ca65889721e3af7527ab26df49a9fb4db87ef
Parents: 96209b1
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Oct 9 16:28:40 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 9 16:28:40 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/KStream.java | 21 ++++-
.../kafka/streams/kstream/Transformer.java | 57 ++++++++++++
.../streams/kstream/TransformerSupplier.java | 24 +++++
.../kafka/streams/kstream/ValueTransformer.java | 56 ++++++++++++
.../kstream/ValueTransformerSupplier.java | 24 +++++
.../streams/kstream/internals/KStreamImpl.java | 30 ++++++-
.../kstream/internals/KStreamTransform.java | 71 +++++++++++++++
.../internals/KStreamTransformValues.java | 69 +++++++++++++++
.../processor/internals/StreamThread.java | 10 ++-
.../kstream/internals/KStreamImplTest.java | 5 +-
.../kstream/internals/KStreamTransformTest.java | 93 ++++++++++++++++++++
.../internals/KStreamTransformValuesTest.java | 92 +++++++++++++++++++
12 files changed, 539 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index ecec882..915cf1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
/**
* KStream is an abstraction of a stream of key-value pairs.
- *
+ *
* @param <K> the type of keys
* @param <V> the type of values
*/
@@ -151,10 +151,27 @@ public interface KStream<K, V> {
void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
/**
+ * Applies a stateful transformation to all elements in this stream.
+ *
+ * @param transformerSupplier the class of TransformerDef
+ * @return KStream
+ */
+ <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier);
+
+ /**
+ * Applies a stateful transformation to all values in this stream.
+ *
+ * @param valueTransformerSupplier the class of TransformerDef
+ * @return KStream
+ */
+ <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier);
+
+ /**
* Processes all elements in this stream by applying a processor.
*
* @param processorSupplier the supplier of the Processor to use
* @return the new stream containing the processed output
*/
- <K1, V1> KStream<K1, V1> process(ProcessorSupplier<K, V> processorSupplier);
+ void process(ProcessorSupplier<K, V> processorSupplier);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
new file mode 100644
index 0000000..b67f619
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public interface Transformer<K, V, R> {
+
+ /**
+ * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
+ * that contains it is initialized.
+ * <p>
+ * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+ * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
+ *
+ * @param context the context; may not be null
+ */
+ void init(ProcessorContext context);
+
+ /**
+ * Transform the message with the given key and value.
+ *
+ * @param key the key for the message
+ * @param value the value for the message
+ * @return new value
+ */
+ R transform(K key, V value);
+
+ /**
+ * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+ * during {@link #init(ProcessorContext) initialization}.
+ *
+ * @param timestamp the stream time when this method is being called
+ */
+ void punctuate(long timestamp);
+
+ /**
+ * Close this processor and clean up any resources.
+ */
+ void close();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
new file mode 100644
index 0000000..2c2d8dd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface TransformerSupplier<K, V, R> {
+
+ Transformer<K, V, R> get();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
new file mode 100644
index 0000000..5b9e2ff
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+public interface ValueTransformer<V, R> {
+
+ /**
+ * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
+ * that contains it is initialized.
+ * <p>
+ * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+ * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
+ *
+ * @param context the context; may not be null
+ */
+ void init(ProcessorContext context);
+
+ /**
+ * Transform the message with the given key and value.
+ *
+ * @param value the value for the message
+ * @return new value
+ */
+ R transform(V value);
+
+ /**
+ * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+ * during {@link #init(ProcessorContext) initialization}.
+ *
+ * @param timestamp the stream time when this method is being called
+ */
+ void punctuate(long timestamp);
+
+ /**
+ * Close this processor and clean up any resources.
+ */
+ void close();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
new file mode 100644
index 0000000..5c053c7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface ValueTransformerSupplier<V, R> {
+
+ ValueTransformer<V, R> get();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index cff97d6..8f56e09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -19,9 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
@@ -44,6 +46,10 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
private static final String FLATMAPVALUES_NAME = "KAFKA-FLATMAPVALUES-";
+ private static final String TRANSFORM_NAME = "KAFKA-TRANSFORM-";
+
+ private static final String TRANSFORMVALUES_NAME = "KAFKA-TRANSFORMVALUES-";
+
private static final String PROCESSOR_NAME = "KAFKA-PROCESSOR-";
private static final String BRANCH_NAME = "KAFKA-BRANCH-";
@@ -191,11 +197,27 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
}
@Override
- public <K1, V1> KStream<K1, V1> process(final ProcessorSupplier<K, V> processorSupplier) {
- String name = PROCESSOR_NAME + INDEX.getAndIncrement();
+ public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) {
+ String name = TRANSFORM_NAME + INDEX.getAndIncrement();
- topology.addProcessor(name, processorSupplier, this.name);
+ topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
+
+ return new KStreamImpl<>(topology, name);
+ }
+
+ @Override
+ public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier) {
+ String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
return new KStreamImpl<>(topology, name);
}
+
+ @Override
+ public void process(final ProcessorSupplier<K, V> processorSupplier) {
+ String name = PROCESSOR_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(name, processorSupplier, this.name);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
new file mode 100644
index 0000000..7ebab0e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public class KStreamTransform<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+
+ private final TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier;
+
+ public KStreamTransform(TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier) {
+ this.transformerSupplier = transformerSupplier;
+ }
+
+ @Override
+ public Processor<K1, V1> get() {
+ return new KStreamTransformProcessor(transformerSupplier.get());
+ }
+
+ public static class KStreamTransformProcessor<K1, V1, K2, V2> implements Processor<K1, V1> {
+
+ private final Transformer<K1, V1, KeyValue<K2, V2>> transformer;
+ private ProcessorContext context;
+
+ public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>> transformer) {
+ this.transformer = transformer;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ transformer.init(context);
+ this.context = context;
+ }
+
+ @Override
+ public void process(K1 key, V1 value) {
+ KeyValue<K2, V2> pair = transformer.transform(key, value);
+ context.forward(pair.key, pair.value);
+ }
+
+ @Override
+ public void punctuate(long timestamp) {
+ transformer.punctuate(timestamp);
+ }
+
+ @Override
+ public void close() {
+ transformer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
new file mode 100644
index 0000000..6f989e6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> {
+
+ private final ValueTransformerSupplier<V, R> valueTransformerSupplier;
+
+ public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) {
+ this.valueTransformerSupplier = valueTransformerSupplier;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KStreamTransformValuesProcessor(valueTransformerSupplier.get());
+ }
+
+ public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
+
+ private final ValueTransformer valueTransformer;
+ private ProcessorContext context;
+
+ public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) {
+ this.valueTransformer = valueTransformer;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ valueTransformer.init(context);
+ this.context = context;
+ }
+
+ @Override
+ public void process(K key, V value) {
+ context.forward(key, valueTransformer.transform(value));
+ }
+
+ @Override
+ public void punctuate(long timestamp) {
+ valueTransformer.punctuate(timestamp);
+ }
+
+ @Override
+ public void close() {
+ valueTransformer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 95a923d..4a68332 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -159,7 +159,7 @@ public class StreamThread extends Thread {
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}
-
+
private Consumer<byte[], byte[]> createRestoreConsumer() {
log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
return new KafkaConsumer<>(config.getConsumerConfigs(),
@@ -240,9 +240,11 @@ public class StreamThread extends Thread {
// try to fetch some records if necessary
ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
- for (StreamTask task : tasks.values()) {
- for (TopicPartition partition : task.partitions()) {
- task.addRecords(partition, records.records(partition));
+ if (!records.isEmpty()) {
+ for (StreamTask task : tasks.values()) {
+ for (TopicPartition partition : task.partitions()) {
+ task.addRecords(partition, records.records(partition));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 875712a..2db488c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -119,7 +119,7 @@ public class KStreamImplTest {
stream4.to("topic-5");
- stream5.through("topic-6").process(new MockProcessorSupplier<>()).to("topic-7");
+ stream5.through("topic-6").process(new MockProcessorSupplier<>());
assertEquals(2 + // sources
2 + // stream1
@@ -131,8 +131,7 @@ public class KStreamImplTest {
2 + 3 + // stream5
1 + // to
2 + // through
- 1 + // process
- 1, // to
+ 1, // process
builder.build().processors().size());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
new file mode 100644
index 0000000..e397dd1
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamTransformTest {
+
+ private String topicName = "topic";
+
+ private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+ private IntegerDeserializer valDeserializer = new IntegerDeserializer();
+
+ @Test
+ public void testTransform() {
+ KStreamBuilder builder = new KStreamBuilder();
+
+ TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>> transformerSupplier =
+ new TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() {
+ public Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() {
+ return new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
+
+ private int total = 0;
+
+ @Override
+ public void init(ProcessorContext context) {
+ }
+
+ @Override
+ public KeyValue<Integer, Integer> transform(Integer key, Integer value) {
+ total += value;
+ return KeyValue.pair(key * 2, total);
+ }
+
+ @Override
+ public void punctuate(long timestamp) {
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+ }
+ };
+
+ final int[] expectedKeys = {1, 10, 100, 1000};
+
+ KStream<Integer, Integer> stream;
+ MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
+ stream = builder.from(keyDeserializer, valDeserializer, topicName);
+ stream.transform(transformerSupplier).process(processor);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
+ }
+
+ assertEquals(4, processor.processed.size());
+
+ String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"};
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], processor.processed.get(i));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c67ca658/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
new file mode 100644
index 0000000..c5c9b39
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamTransformValuesTest {
+
+ private String topicName = "topic";
+
+ private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+ private IntegerDeserializer valDeserializer = new IntegerDeserializer();
+
+ @Test
+ public void testTransform() {
+ KStreamBuilder builder = new KStreamBuilder();
+
+ ValueTransformerSupplier<Integer, Integer> valueTransformerSupplier =
+ new ValueTransformerSupplier<Integer, Integer>() {
+ public ValueTransformer<Integer, Integer> get() {
+ return new ValueTransformer<Integer, Integer>() {
+
+ private int total = 0;
+
+ @Override
+ public void init(ProcessorContext context) {
+ }
+
+ @Override
+ public Integer transform(Integer value) {
+ total += value;
+ return total;
+ }
+
+ @Override
+ public void punctuate(long timestamp) {
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+ }
+ };
+
+ final int[] expectedKeys = {1, 10, 100, 1000};
+
+ KStream<Integer, Integer> stream;
+ MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
+ stream = builder.from(keyDeserializer, valDeserializer, topicName);
+ stream.transformValues(valueTransformerSupplier).process(processor);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
+ }
+
+ assertEquals(4, processor.processed.size());
+
+ String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], processor.processed.get(i));
+ }
+ }
+
+}