You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:26 UTC

[12/50] [abbrv] kafka git commit: KAFKA-3614: Consolidate duplicate code in KGroupedTableImpl

KAFKA-3614: Consolidate duplicate code in KGroupedTableImpl

Feel free to review guozhangwang enothereska mjsax .

Author: Michael G. Noll <mi...@confluent.io>

Reviewers: Matthias J. Sax, Michael G. Noll, Eno Thereska

Closes #1262 from miguno/KAFKA-3614


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/088ab3ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/088ab3ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/088ab3ea

Branch: refs/heads/0.10.0
Commit: 088ab3eaadb2389b52aedc049b6f1f0d4b5fb989
Parents: 18dd198
Author: Michael G. Noll <mi...@confluent.io>
Authored: Tue Apr 26 09:59:37 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Apr 26 09:59:37 2016 -0700

----------------------------------------------------------------------
 .../kstream/internals/KGroupedTableImpl.java    | 73 ++++++--------------
 1 file changed, 23 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/088ab3ea/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index f2e2eed..f7fe4e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -67,41 +67,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                                       Serde<T> aggValueSerde,
                                       String name) {
 
-        String sinkName = topology.newName(KStreamImpl.SINK_NAME);
-        String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
-        String aggregateName = topology.newName(AGGREGATE_NAME);
-
-        String topic = name + REPARTITION_TOPIC_SUFFIX;
-
-        Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
-        Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
-        Serializer<V> valueSerializer = valSerde == null ? null : valSerde.serializer();
-        Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
-
-        ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
-        ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
-
         ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor);
-
-        StateStoreSupplier aggregateStore = Stores.create(name)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .build();
-
-        // send the aggregate key-value pairs to the intermediate topic for partitioning
-        topology.addInternalTopic(topic);
-        topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
-
-        // read the intermediate topic
-        topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
-
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
-        topology.addStateStore(aggregateStore, aggregateName);
-
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName));
+        return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, name);
     }
 
     @Override
@@ -113,14 +80,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         return aggregate(initializer, adder, substractor, null, name);
     }
 
-    @Override
-    public KTable<K, V> reduce(Reducer<V> adder,
-                               Reducer<V> subtractor,
-                               String name) {
-
+    private <T> KTable<K, T> doAggregate(ProcessorSupplier<K, Change<V>> aggregateSupplier,
+                                         Serde<T> aggValueSerde,
+                                         String functionName,
+                                         String name) {
         String sinkName = topology.newName(KStreamImpl.SINK_NAME);
         String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
-        String reduceName = topology.newName(REDUCE_NAME);
+        String funcName = topology.newName(functionName);
 
         String topic = name + REPARTITION_TOPIC_SUFFIX;
 
@@ -132,13 +98,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
         ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
 
-        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
-
         StateStoreSupplier aggregateStore = Stores.create(name)
-                .withKeys(keySerde)
-                .withValues(valSerde)
-                .persistent()
-                .build();
+            .withKeys(keySerde)
+            .withValues(aggValueSerde)
+            .persistent()
+            .build();
 
         // send the aggregate key-value pairs to the intermediate topic for partitioning
         topology.addInternalTopic(topic);
@@ -148,11 +112,19 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
 
         // aggregate the values with the aggregator and local store
-        topology.addProcessor(reduceName, aggregateSupplier, sourceName);
-        topology.addStateStore(aggregateStore, reduceName);
+        topology.addProcessor(funcName, aggregateSupplier, sourceName);
+        topology.addStateStore(aggregateStore, funcName);
 
         // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
+        return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName));
+    }
+
+    @Override
+    public KTable<K, V> reduce(Reducer<V> adder,
+                               Reducer<V> subtractor,
+                               String name) {
+        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
+        return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, name);
     }
 
     @Override
@@ -177,4 +149,5 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                 },
                 Serdes.Long(), name);
     }
-}
+
+}
\ No newline at end of file