You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:26 UTC
[12/50] [abbrv] kafka git commit: KAFKA-3614: Consolidate duplicate
code in KGroupedTableImpl
KAFKA-3614: Consolidate duplicate code in KGroupedTableImpl
Feel free to review guozhangwang enothereska mjsax .
Author: Michael G. Noll <mi...@confluent.io>
Reviewers: Matthias J. Sax, Michael G. Noll, Eno Thereska
Closes #1262 from miguno/KAFKA-3614
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/088ab3ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/088ab3ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/088ab3ea
Branch: refs/heads/0.10.0
Commit: 088ab3eaadb2389b52aedc049b6f1f0d4b5fb989
Parents: 18dd198
Author: Michael G. Noll <mi...@confluent.io>
Authored: Tue Apr 26 09:59:37 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Apr 26 09:59:37 2016 -0700
----------------------------------------------------------------------
.../kstream/internals/KGroupedTableImpl.java | 73 ++++++--------------
1 file changed, 23 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/088ab3ea/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index f2e2eed..f7fe4e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -67,41 +67,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
Serde<T> aggValueSerde,
String name) {
- String sinkName = topology.newName(KStreamImpl.SINK_NAME);
- String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
- String aggregateName = topology.newName(AGGREGATE_NAME);
-
- String topic = name + REPARTITION_TOPIC_SUFFIX;
-
- Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
- Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
- Serializer<V> valueSerializer = valSerde == null ? null : valSerde.serializer();
- Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
-
- ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
- ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
-
ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor);
-
- StateStoreSupplier aggregateStore = Stores.create(name)
- .withKeys(keySerde)
- .withValues(aggValueSerde)
- .persistent()
- .build();
-
- // send the aggregate key-value pairs to the intermediate topic for partitioning
- topology.addInternalTopic(topic);
- topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
-
- // read the intermediate topic
- topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
-
- // aggregate the values with the aggregator and local store
- topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
- topology.addStateStore(aggregateStore, aggregateName);
-
- // return the KTable representation with the intermediate topic as the sources
- return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName));
+ return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, name);
}
@Override
@@ -113,14 +80,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return aggregate(initializer, adder, substractor, null, name);
}
- @Override
- public KTable<K, V> reduce(Reducer<V> adder,
- Reducer<V> subtractor,
- String name) {
-
+ private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier,
+ Serde<T> aggValueSerde,
+ String functionName,
+ String name) {
String sinkName = topology.newName(KStreamImpl.SINK_NAME);
String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
- String reduceName = topology.newName(REDUCE_NAME);
+ String funcName = topology.newName(functionName);
String topic = name + REPARTITION_TOPIC_SUFFIX;
@@ -132,13 +98,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
- ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
-
StateStoreSupplier aggregateStore = Stores.create(name)
- .withKeys(keySerde)
- .withValues(valSerde)
- .persistent()
- .build();
+ .withKeys(keySerde)
+ .withValues(aggValueSerde)
+ .persistent()
+ .build();
// send the aggregate key-value pairs to the intermediate topic for partitioning
topology.addInternalTopic(topic);
@@ -148,11 +112,19 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
// aggregate the values with the aggregator and local store
- topology.addProcessor(reduceName, aggregateSupplier, sourceName);
- topology.addStateStore(aggregateStore, reduceName);
+ topology.addProcessor(funcName, aggregateSupplier, sourceName);
+ topology.addStateStore(aggregateStore, funcName);
// return the KTable representation with the intermediate topic as the sources
- return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
+ return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName));
+ }
+
+ @Override
+ public KTable<K, V> reduce(Reducer<V> adder,
+ Reducer<V> subtractor,
+ String name) {
+ ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
+ return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, name);
}
@Override
@@ -177,4 +149,5 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
},
Serdes.Long(), name);
}
-}
+
+}
\ No newline at end of file