You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/11 21:33:50 UTC

kafka git commit: KAFKA-3519: Refactor Transformer's transform / punctuate to return nullable values

Repository: kafka
Updated Branches:
  refs/heads/trunk 1ec842a3e -> 40fd45664


KAFKA-3519: Refactor Transformer's transform / punctuate to return nullable values

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Dan Norwood, Anna Povzner

Closes #1204 from guozhangwang/KTransformR


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/40fd4566
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/40fd4566
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/40fd4566

Branch: refs/heads/trunk
Commit: 40fd456649b5df29d030da46865b5e7e0ca6db15
Parents: 1ec842a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Apr 11 12:33:48 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 11 12:33:48 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/Transformer.java    |  9 +++++----
 .../kafka/streams/kstream/ValueTransformer.java      |  5 +++--
 .../kafka/streams/kstream/internals/KStreamImpl.java |  4 +---
 .../streams/kstream/internals/KStreamTransform.java  |  9 +++++++--
 .../kstream/internals/KStreamTransformValues.java    | 11 +++++++----
 .../kafka/streams/kstream/internals/KTableImpl.java  |  2 +-
 .../kstream/internals/KStreamTransformTest.java      | 10 +++++++---
 .../internals/KStreamTransformValuesTest.java        |  8 ++++++--
 .../org/apache/kafka/test/KStreamTestDriver.java     | 15 +++++++++++++++
 .../org/apache/kafka/test/MockProcessorContext.java  |  6 +++---
 .../org/apache/kafka/test/MockProcessorSupplier.java |  3 ++-
 11 files changed, 57 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 8069dca..5197e94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -32,7 +32,7 @@ public interface Transformer<K, V, R> {
      * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
      * that contains it is initialized.
      * <p>
-     * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+     * If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
      * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
      *
      * @param context the context; may not be null
@@ -44,17 +44,18 @@ public interface Transformer<K, V, R> {
      *
      * @param key the key for the message
      * @param value the value for the message
-     * @return new value
+     * @return new value; if null no key-value pair will be forwarded to down stream
      */
     R transform(K key, V value);
 
     /**
-     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+     * Perform any periodic operations and possibly generate a key, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
      * during {@link #init(ProcessorContext) initialization}.
      *
      * @param timestamp the stream time when this method is being called
+     * @return new value; if null it will not be forwarded to down stream
      */
-    void punctuate(long timestamp);
+    R punctuate(long timestamp);
 
     /**
      * Close this processor and clean up any resources.

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 1a0679d..63214fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -47,12 +47,13 @@ public interface ValueTransformer<V, R> {
     R transform(V value);
 
     /**
-     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+     * Perform any periodic operations and possibly return a new value, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
      * during {@link #init(ProcessorContext) initialization}.
      *
      * @param timestamp the stream time when this method is being called
+     * @return new value; if null it will not be forwarded to down stream
      */
-    void punctuate(long timestamp);
+    R punctuate(long timestamp);
 
     /**
      * Close this processor and clean up any resources.

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 9707aee..a02cfb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -86,8 +86,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
 
-    private static final String SELECT_NAME = "KSTREAM-SELECT-";
-
     public static final String SINK_NAME = "KSTREAM-SINK-";
 
     public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
@@ -243,7 +241,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public void foreach(ForeachAction<K, V> action) {
         String name = topology.newName(FOREACH_NAME);
 
-        topology.addProcessor(name, new KStreamForeach(action), this.name);
+        topology.addProcessor(name, new KStreamForeach<>(action), this.name);
     }
     public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
         return through(keySerde, valSerde, null, topic);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 09dddfe..af100a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -55,12 +55,17 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
         @Override
         public void process(K1 key, V1 value) {
             KeyValue<K2, V2> pair = transformer.transform(key, value);
-            context().forward(pair.key, pair.value);
+
+            if (pair != null)
+                context().forward(pair.key, pair.value);
         }
 
         @Override
         public void punctuate(long timestamp) {
-            transformer.punctuate(timestamp);
+            KeyValue<K2, V2> pair = transformer.punctuate(timestamp);
+
+            if (pair != null)
+                context().forward(pair.key, pair.value);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 6f989e6..cb9aab1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -27,18 +27,18 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
 
     private final ValueTransformerSupplier<V, R> valueTransformerSupplier;
 
-    public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) {
+    public KStreamTransformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier) {
         this.valueTransformerSupplier = valueTransformerSupplier;
     }
 
     @Override
     public Processor<K, V> get() {
-        return new KStreamTransformValuesProcessor(valueTransformerSupplier.get());
+        return new KStreamTransformValuesProcessor<>(valueTransformerSupplier.get());
     }
 
     public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
 
-        private final ValueTransformer valueTransformer;
+        private final ValueTransformer<V, R> valueTransformer;
         private ProcessorContext context;
 
         public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) {
@@ -58,7 +58,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
 
         @Override
         public void punctuate(long timestamp) {
-            valueTransformer.punctuate(timestamp);
+            R ret = valueTransformer.punctuate(timestamp);
+
+            if (ret != null)
+                context.forward(null, ret);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index adc8b91..ee2c931 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -183,7 +183,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @Override
     public void foreach(final ForeachAction<K, V> action) {
         String name = topology.newName(FOREACH_NAME);
-        KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach(new ForeachAction<K, Change<V>>() {
+        KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach<>(new ForeachAction<K, Change<V>>() {
             @Override
             public void apply(K key, Change<V> value) {
                 action.apply(key, value.newValue);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 4244de5..a0a61f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -59,7 +59,8 @@ public class KStreamTransformTest {
                         }
 
                         @Override
-                        public void punctuate(long timestamp) {
+                        public KeyValue<Integer, Integer> punctuate(long timestamp) {
+                            return KeyValue.pair(-1, (int) timestamp);
                         }
 
                         @Override
@@ -80,9 +81,12 @@ public class KStreamTransformTest {
             driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
         }
 
-        assertEquals(4, processor.processed.size());
+        driver.punctuate(2);
+        driver.punctuate(3);
 
-        String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"};
+        assertEquals(6, processor.processed.size());
+
+        String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
 
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], processor.processed.get(i));

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 52abdf7..f5f9698 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -58,7 +58,8 @@ public class KStreamTransformValuesTest {
                         }
 
                         @Override
-                        public void punctuate(long timestamp) {
+                        public Integer punctuate(long timestamp) {
+                            return (int) timestamp;
                         }
 
                         @Override
@@ -82,7 +83,10 @@ public class KStreamTransformValuesTest {
 
         assertEquals(4, processor.processed.size());
 
-        String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
+        driver.punctuate(2);
+        driver.punctuate(3);
+
+        String[] expected = {"1:10", "10:110", "100:1110", "1000:11110", "null:2", "null:3"};
 
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], processor.processed.get(i));

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 5cfee6b..2ee8730 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -87,6 +87,21 @@ public class KStreamTestDriver {
         }
     }
 
+    public void punctuate(long timestamp) {
+        setTime(timestamp);
+
+        for (ProcessorNode processor : topology.processors()) {
+            if (processor.processor() != null) {
+                currNode = processor;
+                try {
+                    processor.processor().punctuate(timestamp);
+                } finally {
+                    currNode = null;
+                }
+            }
+        }
+    }
+
     public void setTime(long timestamp) {
         context.setTime(timestamp);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 1d478dd..2e2c221 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -171,17 +171,17 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public String topic() {
-        return "mockTopic";
+        return null;
     }
 
     @Override
     public int partition() {
-        throw new UnsupportedOperationException("partition() not supported.");
+        return -1;
     }
 
     @Override
     public long offset() {
-        throw new UnsupportedOperationException("offset() not supported.");
+        return -1L;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd4566/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index 921c365..9cf0eb2 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -57,7 +57,8 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
 
         @Override
         public void process(K key, V value) {
-            processed.add(key + ":" + value);
+            processed.add((key == null ? "null" : key) + ":" +
+                    (value == null ? "null" : value));
         }
 
         @Override