You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/09 08:14:53 UTC

kafka git commit: MINOR: typing ProcessorDef

Repository: kafka
Updated Branches:
  refs/heads/trunk f1110c3fb -> 5a921a323


MINOR: typing ProcessorDef

guozhangwang
This code change properly types ProcessorDef. This also makes KStream.process() typesafe.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #289 from ymatsuda/typing_ProcessorDef


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a921a32
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a921a32
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a921a32

Branch: refs/heads/trunk
Commit: 5a921a3239e8b6c6f0fd6a65093c491ac3e539fd
Parents: f1110c3
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Thu Oct 8 23:19:12 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Oct 8 23:19:12 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/kafka/streams/kstream/KStream.java  | 2 +-
 .../apache/kafka/streams/kstream/internals/KStreamBranch.java    | 4 ++--
 .../apache/kafka/streams/kstream/internals/KStreamFilter.java    | 4 ++--
 .../apache/kafka/streams/kstream/internals/KStreamFlatMap.java   | 4 ++--
 .../kafka/streams/kstream/internals/KStreamFlatMapValues.java    | 4 ++--
 .../org/apache/kafka/streams/kstream/internals/KStreamImpl.java  | 2 +-
 .../org/apache/kafka/streams/kstream/internals/KStreamJoin.java  | 4 ++--
 .../org/apache/kafka/streams/kstream/internals/KStreamMap.java   | 4 ++--
 .../apache/kafka/streams/kstream/internals/KStreamMapValues.java | 4 ++--
 .../kafka/streams/kstream/internals/KStreamPassThrough.java      | 4 ++--
 .../apache/kafka/streams/kstream/internals/KStreamWindow.java    | 4 ++--
 .../java/org/apache/kafka/streams/processor/ProcessorDef.java    | 4 ++--
 12 files changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 7f101ab..6c488cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -152,5 +152,5 @@ public interface KStream<K, V> {
      *
      * @param processorDef the class of ProcessorDef
      */
-    <K1, V1> KStream<K1, V1> process(ProcessorDef processorDef);
+    <K1, V1> KStream<K1, V1> process(ProcessorDef<K, V> processorDef);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
index 6b661b4..c806147 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorDef;
 import org.apache.kafka.streams.kstream.Predicate;
 
-class KStreamBranch<K, V> implements ProcessorDef {
+class KStreamBranch<K, V> implements ProcessorDef<K, V> {
 
     private final Predicate<K, V>[] predicates;
 
@@ -32,7 +32,7 @@ class KStreamBranch<K, V> implements ProcessorDef {
     }
 
     @Override
-    public Processor instance() {
+    public Processor<K, V> instance() {
         return new KStreamBranchProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
index 5444e70..22800f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.processor.ProcessorDef;
 
-class KStreamFilter<K, V> implements ProcessorDef {
+class KStreamFilter<K, V> implements ProcessorDef<K, V> {
 
     private final Predicate<K, V> predicate;
     private final boolean filterOut;
@@ -33,7 +33,7 @@ class KStreamFilter<K, V> implements ProcessorDef {
     }
 
     @Override
-    public Processor instance() {
+    public Processor<K, V> instance() {
         return new KStreamFilterProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index 410cfda..6c7f4ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorDef;
 
-class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef {
+class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef<K1, V1> {
 
     private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper;
 
@@ -32,7 +32,7 @@ class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef {
     }
 
     @Override
-    public Processor instance() {
+    public Processor<K1, V1> instance() {
         return new KStreamFlatMapProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index edca421..9cdcdf5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorDef;
 
-class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef {
+class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef<K1, V1> {
 
     private final ValueMapper<V1, ? extends Iterable<V2>> mapper;
 
@@ -31,7 +31,7 @@ class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef {
     }
 
     @Override
-    public Processor instance() {
+    public Processor<K1, V1> instance() {
         return new KStreamFlatMapValuesProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 6936648..52c717f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -191,7 +191,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> process(final ProcessorDef processorDef) {
+    public <K1, V1> KStream<K1, V1> process(final ProcessorDef<K, V> processorDef) {
         String name = PROCESSOR_NAME + INDEX.getAndIncrement();
 
         topology.addProcessor(name, processorDef, this.name);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
index 4003d29..311efef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.processor.ProcessorDef;
 
 import java.util.Iterator;
 
-class KStreamJoin<K, V, V1, V2> implements ProcessorDef {
+class KStreamJoin<K, V, V1, V2> implements ProcessorDef<K, V1> {
 
     private static abstract class Finder<K, T> {
         abstract Iterator<T> find(K key, long timestamp);
@@ -41,7 +41,7 @@ class KStreamJoin<K, V, V1, V2> implements ProcessorDef {
     }
 
     @Override
-    public Processor instance() {
+    public Processor<K, V1> instance() {
         return new KStreamJoinProcessor(windowName);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 4d31348..a9a7b24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.ProcessorDef;
 
-class KStreamMap<K1, V1, K2, V2> implements ProcessorDef {
+class KStreamMap<K1, V1, K2, V2> implements ProcessorDef<K1, V1> {
 
     private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
 
@@ -32,7 +32,7 @@ class KStreamMap<K1, V1, K2, V2> implements ProcessorDef {
     }
 
     @Override
-    public Processor instance() {
+    public Processor<K1, V1> instance() {
         return new KStreamMapProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index dac6550..ac39f37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorDef;
 
-class KStreamMapValues<K1, V1, V2> implements ProcessorDef {
+class KStreamMapValues<K1, V1, V2> implements ProcessorDef<K1, V1> {
 
     private final ValueMapper<V1, V2> mapper;
 
@@ -31,7 +31,7 @@ class KStreamMapValues<K1, V1, V2> implements ProcessorDef {
     }
 
     @Override
-    public Processor instance() {
+    public Processor<K1, V1> instance() {
         return new KStreamMapProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
index ea39550..0f4638d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
@@ -21,10 +21,10 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorDef;
 
-class KStreamPassThrough<K, V> implements ProcessorDef {
+class KStreamPassThrough<K, V> implements ProcessorDef<K, V> {
 
     @Override
-    public Processor instance() {
+    public Processor<K, V> instance() {
         return new KStreamPassThroughProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
index 6ebcbe1..bdd1323 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.ProcessorDef;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.kstream.WindowDef;
 
-public class KStreamWindow<K, V> implements ProcessorDef {
+public class KStreamWindow<K, V> implements ProcessorDef<K, V> {
 
     private final WindowDef<K, V> windowDef;
 
@@ -37,7 +37,7 @@ public class KStreamWindow<K, V> implements ProcessorDef {
     }
 
     @Override
-    public Processor instance() {
+    public Processor<K, V> instance() {
         return new KStreamWindowProcessor();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
index 99f0299..a32a899 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.processor;
 
-public interface ProcessorDef {
+public interface ProcessorDef<K, V> {
 
-    Processor instance();
+    Processor<K, V> instance();
 }