You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/21 22:42:21 UTC
kafka git commit: KAFKA-3337: Extract selector as a separate groupBy
operator for KTable aggregations
Repository: kafka
Updated Branches:
refs/heads/trunk 9d71489ff -> 5c547475d
KAFKA-3337: Extract selector as a separate groupBy operator for KTable aggregations
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1231 from mjsax/kafka-3337-extact-key-selector-from-agg
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c547475
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c547475
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c547475
Branch: refs/heads/trunk
Commit: 5c547475d86aa336f8b3c4bb69faff39759d5df5
Parents: 9d71489
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Apr 21 13:42:17 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 21 13:42:17 2016 -0700
----------------------------------------------------------------------
.../kafka/streams/kstream/KGroupedTable.java | 82 +++++++++
.../apache/kafka/streams/kstream/KTable.java | 109 ++----------
.../kstream/internals/KGroupedTableImpl.java | 172 +++++++++++++++++++
.../streams/kstream/internals/KTableImpl.java | 162 ++---------------
.../kstream/internals/KTableAggregateTest.java | 14 +-
.../streams/smoketest/SmokeTestClient.java | 17 +-
6 files changed, 291 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
new file mode 100644
index 0000000..86c34b1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table.
+ *
+ * @param <K> Type of primary keys
+ * @param <V> Type of value changes
+ */
+@InterfaceStability.Unstable
+public interface KGroupedTable<K, V> {
+
+ /**
+ * Combine updating values of this stream by the selected key into a new instance of {@link KTable}.
+ *
+ * @param adder the instance of {@link Reducer} for addition
+ * @param subtractor the instance of {@link Reducer} for subtraction
+ * @param name the name of the resulted {@link KTable}
+ */
+ KTable<K, V> reduce(Reducer<V> adder,
+ Reducer<V> subtractor,
+ String name);
+
+ /**
+ * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}.
+ *
+ * @param initializer the instance of {@link Initializer}
+ * @param adder the instance of {@link Aggregator} for addition
+ * @param substractor the instance of {@link Aggregator} for subtraction
+ * @param aggValueSerde value serdes for materializing the aggregated table,
+ * if not specified the default serdes defined in the configs will be used
+ * @param name the name of the resulted table
+ * @param <T> the value type of the aggregated {@link KTable}
+ */
+ <T> KTable<K, T> aggregate(Initializer<T> initializer,
+ Aggregator<K, V, T> adder,
+ Aggregator<K, V, T> substractor,
+ Serde<T> aggValueSerde,
+ String name);
+
+ /**
+ * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}
+ * using default serializers and deserializers.
+ *
+ * @param initializer the instance of {@link Initializer}
+ * @param adder the instance of {@link Aggregator} for addition
+ * @param substractor the instance of {@link Aggregator} for subtraction
+ * @param name the name of the resulted {@link KTable}
+ * @param <T> the value type of the aggregated {@link KTable}
+ */
+ <T> KTable<K, T> aggregate(Initializer<T> initializer,
+ Aggregator<K, V, T> adder,
+ Aggregator<K, V, T> substractor,
+ String name);
+
+ /**
+ * Count number of records of this stream by the selected key into a new instance of {@link KTable}.
+ *
+ * @param name the name of the resulted {@link KTable}
+ */
+ KTable<K, Long> count(String name);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1e44cb5..8414279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -242,113 +242,26 @@ public interface KTable<K, V> {
<V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
/**
- * Combine updating values of this stream by the selected key into a new instance of {@link KTable}.
- *
- * @param adder the instance of {@link Reducer} for addition
- * @param subtractor the instance of {@link Reducer} for subtraction
- * @param selector the instance of {@link KeyValueMapper} that select the aggregate key
- * @param keySerde key serdes for materializing the aggregated table,
- * if not specified the default serdes defined in the configs will be used
- * @param valueSerde value serdes for materializing the aggregated table,
- * if not specified the default serdes defined in the configs will be used
- * @param name the name of the resulted {@link KTable}
- * @param <K1> the key type of the aggregated {@link KTable}
- * @param <V1> the value type of the aggregated {@link KTable}
- */
- <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
- Reducer<V1> subtractor,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serde<K1> keySerde,
- Serde<V1> valueSerde,
- String name);
-
- /**
- * Combine updating values of this stream by the selected key into a new instance of {@link KTable}
- * using default serializers and deserializers.
- *
- * @param adder the instance of {@link Reducer} for addition
- * @param subtractor the instance of {@link Reducer} for subtraction
- * @param selector the instance of {@link KeyValueMapper} that select the aggregate key
- * @param name the name of the resulted {@link KTable}
- * @param <K1> the key type of the aggregated {@link KTable}
- * @param <V1> the value type of the aggregated {@link KTable}
- */
- <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
- Reducer<V1> subtractor,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- String name);
-
- /**
- * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}.
- *
- * @param initializer the instance of {@link Initializer}
- * @param adder the instance of {@link Aggregator} for addition
- * @param substractor the instance of {@link Aggregator} for subtraction
- * @param selector the instance of {@link KeyValueMapper} that select the aggregate key
- * @param keySerde key serdes for materializing this stream and the aggregated table,
- * if not specified the default serdes defined in the configs will be used
- * @param valueSerde value serdes for materializing this stream,
- * if not specified the default serdes defined in the configs will be used
- * @param aggValueSerde value serdes for materializing the aggregated table,
- * if not specified the default serdes defined in the configs will be used
- * @param name the name of the resulted table
- * @param <K1> the key type of this {@link KTable}
- * @param <V1> the value type of this {@link KTable}
- * @param <T> the value type of the aggregated {@link KTable}
- */
- <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
- Aggregator<K1, V1, T> adder,
- Aggregator<K1, V1, T> substractor,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serde<K1> keySerde,
- Serde<V1> valueSerde,
- Serde<T> aggValueSerde,
- String name);
-
- /**
- * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}
- * using default serializers and deserializers.
- *
- * @param initializer the instance of {@link Initializer}
- * @param adder the instance of {@link Aggregator} for addition
- * @param substractor the instance of {@link Aggregator} for subtraction
- * @param selector the instance of {@link KeyValueMapper} that select the aggregate key
- * @param name the name of the resulted {@link KTable}
- * @param <K1> the key type of the aggregated {@link KTable}
- * @param <V1> the value type of the aggregated {@link KTable}
- * @param <T> the value type of the aggregated {@link KTable}
- */
- <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
- Aggregator<K1, V1, T> adder,
- Aggregator<K1, V1, T> substractor,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- String name);
-
- /**
- * Count number of records of this stream by the selected key into a new instance of {@link KTable}.
- *
- * @param selector the instance of {@link KeyValueMapper} that select the aggregate key
+ * Group the records of this {@link KTable} using the provided {@link KeyValueMapper}.
+ *
+ * @param selector select the grouping key and value to be aggregated
* @param keySerde key serdes for materializing this stream,
* if not specified the default serdes defined in the configs will be used
* @param valueSerde value serdes for materializing this stream,
* if not specified the default serdes defined in the configs will be used
- * @param name the name of the resulted table
- * @param <K1> the key type of the aggregated {@link KTable}
+ * @param <K1> the key type of the {@link KGroupedTable}
+ * @param <V1> the value type of the {@link KGroupedTable}
*/
- <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector,
- Serde<K1> keySerde,
- Serde<V> valueSerde,
- String name);
+ <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde);
/**
- * Count number of records of this stream by the selected key into a new instance of {@link KTable}
- * using default serializers and deserializers.
+ * Group the records of this {@link KTable} using the provided {@link KeyValueMapper} and default serializers and deserializers.
*
- * @param selector the instance of {@link KeyValueMapper} that select the aggregate key
- * @param name the name of the resulted {@link KTable}
- * @param <K1> the key type of the aggregated {@link KTable}
+ * @param selector select the grouping key and value to be aggregated
+ * @param <K1> the key type of the {@link KGroupedTable}
+ * @param <V1> the value type of the {@link KGroupedTable}
*/
- <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name);
+ <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector);
/**
* Perform an action on each element of {@link KTable}.
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
new file mode 100644
index 0000000..d9b0f3d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedTable;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Collections;
+
+/**
+ * The implementation class of {@link KGroupedTable}.
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroupedTable<K, V> {
+
+ private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
+
+ private static final String REDUCE_NAME = "KTABLE-REDUCE-";
+
+ private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
+
+ protected final Serde<K> keySerde;
+ protected final Serde<V> valSerde;
+
+ private final String sourceName;
+
+ public KGroupedTableImpl(KStreamBuilder topology,
+ String name,
+ String sourceName,
+ Serde<K> keySerde,
+ Serde<V> valSerde) {
+ super(topology, name, Collections.singleton(sourceName));
+ this.sourceName = sourceName;
+ this.keySerde = keySerde;
+ this.valSerde = valSerde;
+ }
+
+ @Override
+ public <T> KTable<K, T> aggregate(Initializer<T> initializer,
+ Aggregator<K, V, T> adder,
+ Aggregator<K, V, T> subtractor,
+ Serde<T> aggValueSerde,
+ String name) {
+
+ String sinkName = topology.newName(KStreamImpl.SINK_NAME);
+ String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
+ String aggregateName = topology.newName(AGGREGATE_NAME);
+
+ String topic = name + REPARTITION_TOPIC_SUFFIX;
+
+ ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer());
+ ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer());
+
+ ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor);
+
+ StateStoreSupplier aggregateStore = Stores.create(name)
+ .withKeys(keySerde)
+ .withValues(aggValueSerde)
+ .persistent()
+ .build();
+
+ // send the aggregate key-value pairs to the intermediate topic for partitioning
+ topology.addInternalTopic(topic);
+ topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name);
+
+ // read the intermediate topic
+ topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
+
+ // aggregate the values with the aggregator and local store
+ topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
+ topology.addStateStore(aggregateStore, aggregateName);
+
+ // return the KTable representation with the intermediate topic as the sources
+ return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName));
+ }
+
+ @Override
+ public <T> KTable<K, T> aggregate(Initializer<T> initializer,
+ Aggregator<K, V, T> adder,
+ Aggregator<K, V, T> substractor,
+ String name) {
+
+ return aggregate(initializer, adder, substractor, null, name);
+ }
+
+ @Override
+ public KTable<K, Long> count(String name) {
+ return this.aggregate(
+ new Initializer<Long>() {
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ },
+ new Aggregator<K, V, Long>() {
+ @Override
+ public Long apply(K aggKey, V value, Long aggregate) {
+ return aggregate + 1L;
+ }
+ }, new Aggregator<K, V, Long>() {
+ @Override
+ public Long apply(K aggKey, V value, Long aggregate) {
+ return aggregate - 1L;
+ }
+ },
+ Serdes.Long(), name);
+ }
+
+ @Override
+ public KTable<K, V> reduce(Reducer<V> adder,
+ Reducer<V> subtractor,
+ String name) {
+
+ String sinkName = topology.newName(KStreamImpl.SINK_NAME);
+ String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
+ String reduceName = topology.newName(REDUCE_NAME);
+
+ String topic = name + REPARTITION_TOPIC_SUFFIX;
+
+ ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer());
+ ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer());
+
+ ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
+
+ StateStoreSupplier aggregateStore = Stores.create(name)
+ .withKeys(keySerde)
+ .withValues(valSerde)
+ .persistent()
+ .build();
+
+ // send the aggregate key-value pairs to the intermediate topic for partitioning
+ topology.addInternalTopic(topic);
+ topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name);
+
+ // read the intermediate topic
+ topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
+
+ // aggregate the values with the aggregator and local store
+ topology.addProcessor(reduceName, aggregateSupplier, sourceName);
+ topology.addStateStore(aggregateStore, reduceName);
+
+ // return the KTable representation with the intermediate topic as the sources
+ return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 5c291f5..51d4cb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,45 +18,38 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.state.Stores;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
-import java.util.Collections;
import java.util.Set;
/**
- * The implementation class of KTable
+ * The implementation class of {@link KTable}.
* @param <K> the key type
* @param <S> the source's (parent's) value type
* @param <V> the value type
*/
public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
- private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
-
- private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
-
private static final String FILTER_NAME = "KTABLE-FILTER-";
+ private static final String FOREACH_NAME = "KTABLE-FOREACH-";
+
public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
@@ -75,16 +68,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
- private static final String REDUCE_NAME = "KTABLE-REDUCE-";
-
private static final String SELECT_NAME = "KTABLE-SELECT-";
public static final String SOURCE_NAME = "KTABLE-SOURCE-";
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
- private static final String FOREACH_NAME = "KTABLE-FOREACH-";
-
public final ProcessorSupplier<?, ?> processorSupplier;
private final Serde<K> keySerde;
@@ -172,7 +161,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
}
-
@Override
public KTable<K, V> through(Serde<K> keySerde,
Serde<V> valSerde,
@@ -319,154 +307,24 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
- public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
- Aggregator<K1, V1, T> adder,
- Aggregator<K1, V1, T> subtractor,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serde<K1> keySerde,
- Serde<V1> valueSerde,
- Serde<T> aggValueSerde,
- String name) {
+ public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serde<K1> keySerde,
+ Serde<V1> valueSerde) {
String selectName = topology.newName(SELECT_NAME);
- String sinkName = topology.newName(KStreamImpl.SINK_NAME);
- String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
- String aggregateName = topology.newName(AGGREGATE_NAME);
-
- String topic = name + REPARTITION_TOPIC_SUFFIX;
-
- ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerde.serializer());
- ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueSerde.deserializer());
KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
- ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor);
-
- StateStoreSupplier aggregateStore = Stores.create(name)
- .withKeys(keySerde)
- .withValues(aggValueSerde)
- .persistent()
- .build();
-
// select the aggregate key and values (old and new), it would require parent to send old values
topology.addProcessor(selectName, selectSupplier, this.name);
this.enableSendingOldValues();
- // send the aggregate key-value pairs to the intermediate topic for partitioning
- topology.addInternalTopic(topic);
- topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, selectName);
-
- // read the intermediate topic
- topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
-
- // aggregate the values with the aggregator and local store
- topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
- topology.addStateStore(aggregateStore, aggregateName);
-
- // return the KTable representation with the intermediate topic as the sources
- return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName));
+ return new KGroupedTableImpl<>(topology, selectName, this.name, keySerde, valueSerde);
}
@Override
- public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
- Aggregator<K1, V1, T> adder,
- Aggregator<K1, V1, T> substractor,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- String name) {
-
- return aggregate(initializer, adder, substractor, selector, null, null, null, name);
- }
-
- @Override
- public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector,
- Serde<K1> keySerde,
- Serde<V> valueSerde,
- String name) {
- return this.aggregate(
- new Initializer<Long>() {
- @Override
- public Long apply() {
- return 0L;
- }
- },
- new Aggregator<K1, V, Long>() {
- @Override
- public Long apply(K1 aggKey, V value, Long aggregate) {
- return aggregate + 1L;
- }
- }, new Aggregator<K1, V, Long>() {
- @Override
- public Long apply(K1 aggKey, V value, Long aggregate) {
- return aggregate - 1L;
- }
- }, new KeyValueMapper<K, V, KeyValue<K1, V>>() {
- @Override
- public KeyValue<K1, V> apply(K key, V value) {
- return new KeyValue<>(selector.apply(key, value), value);
- }
- },
- keySerde, valueSerde, Serdes.Long(), name);
- }
-
- @Override
- public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector, String name) {
- return count(selector, null, null, name);
- }
-
- @Override
- public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
- Reducer<V1> subtractor,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serde<K1> keySerde,
- Serde<V1> valueSerde,
- String name) {
-
- String selectName = topology.newName(SELECT_NAME);
- String sinkName = topology.newName(KStreamImpl.SINK_NAME);
- String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
- String reduceName = topology.newName(REDUCE_NAME);
-
- String topic = name + REPARTITION_TOPIC_SUFFIX;
-
- ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerde.serializer());
- ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueSerde.deserializer());
-
- KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
-
- ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
-
- StateStoreSupplier aggregateStore = Stores.create(name)
- .withKeys(keySerde)
- .withValues(valueSerde)
- .persistent()
- .build();
-
- // select the aggregate key and values (old and new), it would require parent to send old values
- topology.addProcessor(selectName, selectSupplier, this.name);
- this.enableSendingOldValues();
-
- // send the aggregate key-value pairs to the intermediate topic for partitioning
- topology.addInternalTopic(topic);
- topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, selectName);
-
- // read the intermediate topic
- topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
-
- // aggregate the values with the aggregator and local store
- topology.addProcessor(reduceName, aggregateSupplier, sourceName);
- topology.addStateStore(aggregateStore, reduceName);
-
- // return the KTable representation with the intermediate topic as the sources
- return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
- }
-
- @Override
- public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
- Reducer<V1> subtractor,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- String name) {
-
- return reduce(adder, subtractor, selector, null, null, name);
+ public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector) {
+ return this.groupBy(selector, null, null);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index fc01e5e..1564e95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -72,12 +72,14 @@ public class KTableAggregateTest {
String topic1 = "topic1";
KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
- KTable<String, String> table2 = table1.aggregate(new StringInit(), new StringAdd(), new StringRemove(),
- new NoOpKeyValueMapper<String, String>(),
- stringSerde,
- stringSerde,
- stringSerde,
- "topic1-Canonized");
+ KTable<String, String> table2 = table1.groupBy(new NoOpKeyValueMapper<String, String>(),
+ stringSerde,
+ stringSerde
+ ).aggregate(new StringInit(),
+ new StringAdd(),
+ new StringRemove(),
+ stringSerde,
+ "topic1-Canonized");
MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 0a02824..95e0fbf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -220,15 +220,14 @@ public class SmokeTestClient extends SmokeTestUtil {
// test repartition
Agg agg = new Agg();
- cntTable.aggregate(
- agg.init(),
- agg.adder(),
- agg.remover(),
- agg.selector(),
- stringSerde,
- longSerde,
- longSerde,
- "cntByCnt"
+ cntTable.groupBy(agg.selector(),
+ stringSerde,
+ longSerde
+ ).aggregate(agg.init(),
+ agg.adder(),
+ agg.remover(),
+ longSerde,
+ "cntByCnt"
).to(stringSerde, longSerde, "tagg");
return new KafkaStreams(builder, props);