You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/11 21:33:50 UTC
kafka git commit: KAFKA-3519: Refactor Transformer's transform /
punctuate to return nullable values
Repository: kafka
Updated Branches:
refs/heads/trunk 1ec842a3e -> 40fd45664
KAFKA-3519: Refactor Transformer's transform / punctuate to return nullable values
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Dan Norwood, Anna Povzner
Closes #1204 from guozhangwang/KTransformR
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/40fd4566
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/40fd4566
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/40fd4566
Branch: refs/heads/trunk
Commit: 40fd456649b5df29d030da46865b5e7e0ca6db15
Parents: 1ec842a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Apr 11 12:33:48 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 11 12:33:48 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/Transformer.java | 9 +++++----
.../kafka/streams/kstream/ValueTransformer.java | 5 +++--
.../kafka/streams/kstream/internals/KStreamImpl.java | 4 +---
.../streams/kstream/internals/KStreamTransform.java | 9 +++++++--
.../kstream/internals/KStreamTransformValues.java | 11 +++++++----
.../kafka/streams/kstream/internals/KTableImpl.java | 2 +-
.../kstream/internals/KStreamTransformTest.java | 10 +++++++---
.../internals/KStreamTransformValuesTest.java | 8 ++++++--
.../org/apache/kafka/test/KStreamTestDriver.java | 15 +++++++++++++++
.../org/apache/kafka/test/MockProcessorContext.java | 6 +++---
.../org/apache/kafka/test/MockProcessorSupplier.java | 3 ++-
11 files changed, 57 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 8069dca..5197e94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -32,7 +32,7 @@ public interface Transformer<K, V, R> {
* Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
* that contains it is initialized.
* <p>
- * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+ * If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
* {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
*
* @param context the context; may not be null
@@ -44,17 +44,18 @@ public interface Transformer<K, V, R> {
*
* @param key the key for the message
* @param value the value for the message
- * @return new value
+ * @return new value; if null no key-value pair will be forwarded to down stream
*/
R transform(K key, V value);
/**
- * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+ * Perform any periodic operations and possibly generate a key, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
* during {@link #init(ProcessorContext) initialization}.
*
* @param timestamp the stream time when this method is being called
+ * @return new value; if null it will not be forwarded to down stream
*/
- void punctuate(long timestamp);
+ R punctuate(long timestamp);
/**
* Close this processor and clean up any resources.
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 1a0679d..63214fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -47,12 +47,13 @@ public interface ValueTransformer<V, R> {
R transform(V value);
/**
- * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+ * Perform any periodic operations and possibly return a new value, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
* during {@link #init(ProcessorContext) initialization}.
*
* @param timestamp the stream time when this method is being called
+ * @return new value; if null it will not be forwarded to down stream
*/
- void punctuate(long timestamp);
+ R punctuate(long timestamp);
/**
* Close this processor and clean up any resources.
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 9707aee..a02cfb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -86,8 +86,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
- private static final String SELECT_NAME = "KSTREAM-SELECT-";
-
public static final String SINK_NAME = "KSTREAM-SINK-";
public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
@@ -243,7 +241,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public void foreach(ForeachAction<K, V> action) {
String name = topology.newName(FOREACH_NAME);
- topology.addProcessor(name, new KStreamForeach(action), this.name);
+ topology.addProcessor(name, new KStreamForeach<>(action), this.name);
}
public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
return through(keySerde, valSerde, null, topic);
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 09dddfe..af100a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -55,12 +55,17 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
@Override
public void process(K1 key, V1 value) {
KeyValue<K2, V2> pair = transformer.transform(key, value);
- context().forward(pair.key, pair.value);
+
+ if (pair != null)
+ context().forward(pair.key, pair.value);
}
@Override
public void punctuate(long timestamp) {
- transformer.punctuate(timestamp);
+ KeyValue<K2, V2> pair = transformer.punctuate(timestamp);
+
+ if (pair != null)
+ context().forward(pair.key, pair.value);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 6f989e6..cb9aab1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -27,18 +27,18 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
private final ValueTransformerSupplier<V, R> valueTransformerSupplier;
- public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) {
+ public KStreamTransformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier) {
this.valueTransformerSupplier = valueTransformerSupplier;
}
@Override
public Processor<K, V> get() {
- return new KStreamTransformValuesProcessor(valueTransformerSupplier.get());
+ return new KStreamTransformValuesProcessor<>(valueTransformerSupplier.get());
}
public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
- private final ValueTransformer valueTransformer;
+ private final ValueTransformer<V, R> valueTransformer;
private ProcessorContext context;
public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) {
@@ -58,7 +58,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
@Override
public void punctuate(long timestamp) {
- valueTransformer.punctuate(timestamp);
+ R ret = valueTransformer.punctuate(timestamp);
+
+ if (ret != null)
+ context.forward(null, ret);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index adc8b91..ee2c931 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -183,7 +183,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public void foreach(final ForeachAction<K, V> action) {
String name = topology.newName(FOREACH_NAME);
- KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach(new ForeachAction<K, Change<V>>() {
+ KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach<>(new ForeachAction<K, Change<V>>() {
@Override
public void apply(K key, Change<V> value) {
action.apply(key, value.newValue);
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 4244de5..a0a61f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -59,7 +59,8 @@ public class KStreamTransformTest {
}
@Override
- public void punctuate(long timestamp) {
+ public KeyValue<Integer, Integer> punctuate(long timestamp) {
+ return KeyValue.pair(-1, (int) timestamp);
}
@Override
@@ -80,9 +81,12 @@ public class KStreamTransformTest {
driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
}
- assertEquals(4, processor.processed.size());
+ driver.punctuate(2);
+ driver.punctuate(3);
- String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"};
+ assertEquals(6, processor.processed.size());
+
+ String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processor.processed.get(i));
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 52abdf7..f5f9698 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -58,7 +58,8 @@ public class KStreamTransformValuesTest {
}
@Override
- public void punctuate(long timestamp) {
+ public Integer punctuate(long timestamp) {
+ return (int) timestamp;
}
@Override
@@ -82,7 +83,10 @@ public class KStreamTransformValuesTest {
assertEquals(4, processor.processed.size());
- String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
+ driver.punctuate(2);
+ driver.punctuate(3);
+
+ String[] expected = {"1:10", "10:110", "100:1110", "1000:11110", "null:2", "null:3"};
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processor.processed.get(i));
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 5cfee6b..2ee8730 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -87,6 +87,21 @@ public class KStreamTestDriver {
}
}
+ public void punctuate(long timestamp) {
+ setTime(timestamp);
+
+ for (ProcessorNode processor : topology.processors()) {
+ if (processor.processor() != null) {
+ currNode = processor;
+ try {
+ processor.processor().punctuate(timestamp);
+ } finally {
+ currNode = null;
+ }
+ }
+ }
+ }
+
public void setTime(long timestamp) {
context.setTime(timestamp);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 1d478dd..2e2c221 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -171,17 +171,17 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
@Override
public String topic() {
- return "mockTopic";
+ return null;
}
@Override
public int partition() {
- throw new UnsupportedOperationException("partition() not supported.");
+ return -1;
}
@Override
public long offset() {
- throw new UnsupportedOperationException("offset() not supported.");
+ return -1L;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index 921c365..9cf0eb2 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -57,7 +57,8 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
@Override
public void process(K key, V value) {
- processed.add(key + ":" + value);
+ processed.add((key == null ? "null" : key) + ":" +
+ (value == null ? "null" : value));
}
@Override