You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/09 08:14:53 UTC
kafka git commit: MINOR: typing ProcessorDef
Repository: kafka
Updated Branches:
refs/heads/trunk f1110c3fb -> 5a921a323
MINOR: typing ProcessorDef
guozhangwang
This code change properly types ProcessorDef. This also makes KStream.process() typesafe.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Ismael Juma, Guozhang Wang
Closes #289 from ymatsuda/typing_ProcessorDef
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a921a32
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a921a32
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a921a32
Branch: refs/heads/trunk
Commit: 5a921a3239e8b6c6f0fd6a65093c491ac3e539fd
Parents: f1110c3
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Thu Oct 8 23:19:12 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Oct 8 23:19:12 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/kafka/streams/kstream/KStream.java | 2 +-
.../apache/kafka/streams/kstream/internals/KStreamBranch.java | 4 ++--
.../apache/kafka/streams/kstream/internals/KStreamFilter.java | 4 ++--
.../apache/kafka/streams/kstream/internals/KStreamFlatMap.java | 4 ++--
.../kafka/streams/kstream/internals/KStreamFlatMapValues.java | 4 ++--
.../org/apache/kafka/streams/kstream/internals/KStreamImpl.java | 2 +-
.../org/apache/kafka/streams/kstream/internals/KStreamJoin.java | 4 ++--
.../org/apache/kafka/streams/kstream/internals/KStreamMap.java | 4 ++--
.../apache/kafka/streams/kstream/internals/KStreamMapValues.java | 4 ++--
.../kafka/streams/kstream/internals/KStreamPassThrough.java | 4 ++--
.../apache/kafka/streams/kstream/internals/KStreamWindow.java | 4 ++--
.../java/org/apache/kafka/streams/processor/ProcessorDef.java | 4 ++--
12 files changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 7f101ab..6c488cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -152,5 +152,5 @@ public interface KStream<K, V> {
*
* @param processorDef the class of ProcessorDef
*/
- <K1, V1> KStream<K1, V1> process(ProcessorDef processorDef);
+ <K1, V1> KStream<K1, V1> process(ProcessorDef<K, V> processorDef);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
index 6b661b4..c806147 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorDef;
import org.apache.kafka.streams.kstream.Predicate;
-class KStreamBranch<K, V> implements ProcessorDef {
+class KStreamBranch<K, V> implements ProcessorDef<K, V> {
private final Predicate<K, V>[] predicates;
@@ -32,7 +32,7 @@ class KStreamBranch<K, V> implements ProcessorDef {
}
@Override
- public Processor instance() {
+ public Processor<K, V> instance() {
return new KStreamBranchProcessor();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
index 5444e70..22800f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.ProcessorDef;
-class KStreamFilter<K, V> implements ProcessorDef {
+class KStreamFilter<K, V> implements ProcessorDef<K, V> {
private final Predicate<K, V> predicate;
private final boolean filterOut;
@@ -33,7 +33,7 @@ class KStreamFilter<K, V> implements ProcessorDef {
}
@Override
- public Processor instance() {
+ public Processor<K, V> instance() {
return new KStreamFilterProcessor();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index 410cfda..6c7f4ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorDef;
-class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef {
+class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef<K1, V1> {
private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper;
@@ -32,7 +32,7 @@ class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef {
}
@Override
- public Processor instance() {
+ public Processor<K1, V1> instance() {
return new KStreamFlatMapProcessor();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index edca421..9cdcdf5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorDef;
-class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef {
+class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef<K1, V1> {
private final ValueMapper<V1, ? extends Iterable<V2>> mapper;
@@ -31,7 +31,7 @@ class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef {
}
@Override
- public Processor instance() {
+ public Processor<K1, V1> instance() {
return new KStreamFlatMapValuesProcessor();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 6936648..52c717f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -191,7 +191,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
}
@Override
- public <K1, V1> KStream<K1, V1> process(final ProcessorDef processorDef) {
+ public <K1, V1> KStream<K1, V1> process(final ProcessorDef<K, V> processorDef) {
String name = PROCESSOR_NAME + INDEX.getAndIncrement();
topology.addProcessor(name, processorDef, this.name);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
index 4003d29..311efef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.processor.ProcessorDef;
import java.util.Iterator;
-class KStreamJoin<K, V, V1, V2> implements ProcessorDef {
+class KStreamJoin<K, V, V1, V2> implements ProcessorDef<K, V1> {
private static abstract class Finder<K, T> {
abstract Iterator<T> find(K key, long timestamp);
@@ -41,7 +41,7 @@ class KStreamJoin<K, V, V1, V2> implements ProcessorDef {
}
@Override
- public Processor instance() {
+ public Processor<K, V1> instance() {
return new KStreamJoinProcessor(windowName);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 4d31348..a9a7b24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.ProcessorDef;
-class KStreamMap<K1, V1, K2, V2> implements ProcessorDef {
+class KStreamMap<K1, V1, K2, V2> implements ProcessorDef<K1, V1> {
private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
@@ -32,7 +32,7 @@ class KStreamMap<K1, V1, K2, V2> implements ProcessorDef {
}
@Override
- public Processor instance() {
+ public Processor<K1, V1> instance() {
return new KStreamMapProcessor();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index dac6550..ac39f37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorDef;
-class KStreamMapValues<K1, V1, V2> implements ProcessorDef {
+class KStreamMapValues<K1, V1, V2> implements ProcessorDef<K1, V1> {
private final ValueMapper<V1, V2> mapper;
@@ -31,7 +31,7 @@ class KStreamMapValues<K1, V1, V2> implements ProcessorDef {
}
@Override
- public Processor instance() {
+ public Processor<K1, V1> instance() {
return new KStreamMapProcessor();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
index ea39550..0f4638d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
@@ -21,10 +21,10 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorDef;
-class KStreamPassThrough<K, V> implements ProcessorDef {
+class KStreamPassThrough<K, V> implements ProcessorDef<K, V> {
@Override
- public Processor instance() {
+ public Processor<K, V> instance() {
return new KStreamPassThroughProcessor();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
index 6ebcbe1..bdd1323 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.ProcessorDef;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.kstream.WindowDef;
-public class KStreamWindow<K, V> implements ProcessorDef {
+public class KStreamWindow<K, V> implements ProcessorDef<K, V> {
private final WindowDef<K, V> windowDef;
@@ -37,7 +37,7 @@ public class KStreamWindow<K, V> implements ProcessorDef {
}
@Override
- public Processor instance() {
+ public Processor<K, V> instance() {
return new KStreamWindowProcessor();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a921a32/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
index 99f0299..a32a899 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.processor;
-public interface ProcessorDef {
+public interface ProcessorDef<K, V> {
- Processor instance();
+ Processor<K, V> instance();
}