You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/21 22:42:21 UTC

kafka git commit: KAFKA-3337: Extract selector as a separate groupBy operator for KTable aggregations

Repository: kafka
Updated Branches:
  refs/heads/trunk 9d71489ff -> 5c547475d


KAFKA-3337: Extract selector as a separate groupBy operator for KTable aggregations

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1231 from mjsax/kafka-3337-extact-key-selector-from-agg


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c547475
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c547475
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c547475

Branch: refs/heads/trunk
Commit: 5c547475d86aa336f8b3c4bb69faff39759d5df5
Parents: 9d71489
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Apr 21 13:42:17 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 21 13:42:17 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedTable.java    |  82 +++++++++
 .../apache/kafka/streams/kstream/KTable.java    | 109 ++----------
 .../kstream/internals/KGroupedTableImpl.java    | 172 +++++++++++++++++++
 .../streams/kstream/internals/KTableImpl.java   | 162 ++---------------
 .../kstream/internals/KTableAggregateTest.java  |  14 +-
 .../streams/smoketest/SmokeTestClient.java      |  17 +-
 6 files changed, 291 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
new file mode 100644
index 0000000..86c34b1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i> from a primary-keyed table.
+ *
+ * @param <K> Type of primary keys
+ * @param <V> Type of value changes
+ */
+@InterfaceStability.Unstable
+public interface KGroupedTable<K, V> {
+
+    /**
+     * Combine updating values of this stream by the selected key into a new instance of {@link KTable}.
+     *
+     * @param adder         the instance of {@link Reducer} for addition
+     * @param subtractor    the instance of {@link Reducer} for subtraction
+     * @param name          the name of the resulted {@link KTable}
+     */
+    KTable<K, V> reduce(Reducer<V> adder,
+                        Reducer<V> subtractor,
+                        String name);
+
+    /**
+     * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param adder         the instance of {@link Aggregator} for addition
+     * @param substractor   the instance of {@link Aggregator} for subtraction
+     * @param aggValueSerde value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param name          the name of the resulted table
+     * @param <T>           the value type of the aggregated {@link KTable}
+     */
+    <T> KTable<K, T> aggregate(Initializer<T> initializer,
+                               Aggregator<K, V, T> adder,
+                               Aggregator<K, V, T> substractor,
+                               Serde<T> aggValueSerde,
+                               String name);
+
+    /**
+     * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}
+     * using default serializers and deserializers.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param adder         the instance of {@link Aggregator} for addition
+     * @param substractor   the instance of {@link Aggregator} for subtraction
+     * @param name          the name of the resulted {@link KTable}
+     * @param <T>           the value type of the aggregated {@link KTable}
+     */
+    <T> KTable<K, T> aggregate(Initializer<T> initializer,
+                               Aggregator<K, V, T> adder,
+                               Aggregator<K, V, T> substractor,
+                               String name);
+
+    /**
+     * Count number of records of this stream by the selected key into a new instance of {@link KTable}.
+     *
+     * @param name          the name of the resulted {@link KTable}
+     */
+    KTable<K, Long> count(String name);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1e44cb5..8414279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -242,113 +242,26 @@ public interface KTable<K, V> {
     <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
-     * Combine updating values of this stream by the selected key into a new instance of {@link KTable}.
-     *
-     * @param adder             the instance of {@link Reducer} for addition
-     * @param subtractor        the instance of {@link Reducer} for subtraction
-     * @param selector          the instance of {@link KeyValueMapper} that select the aggregate key
-     * @param keySerde          key serdes for materializing the aggregated table,
-     *                          if not specified the default serdes defined in the configs will be used
-     * @param valueSerde        value serdes for materializing the aggregated table,
-     *                          if not specified the default serdes defined in the configs will be used
-     * @param name              the name of the resulted {@link KTable}
-     * @param <K1>              the key type of the aggregated {@link KTable}
-     * @param <V1>              the value type of the aggregated {@link KTable}
-     */
-    <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
-                                   Reducer<V1> subtractor,
-                                   KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                   Serde<K1> keySerde,
-                                   Serde<V1> valueSerde,
-                                   String name);
-
-    /**
-     * Combine updating values of this stream by the selected key into a new instance of {@link KTable}
-     * using default serializers and deserializers.
-     *
-     * @param adder         the instance of {@link Reducer} for addition
-     * @param subtractor    the instance of {@link Reducer} for subtraction
-     * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
-     * @param name          the name of the resulted {@link KTable}
-     * @param <K1>          the key type of the aggregated {@link KTable}
-     * @param <V1>          the value type of the aggregated {@link KTable}
-     */
-    <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
-                                   Reducer<V1> subtractor,
-                                   KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                   String name);
-
-    /**
-     * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}.
-     *
-     * @param initializer   the instance of {@link Initializer}
-     * @param adder         the instance of {@link Aggregator} for addition
-     * @param substractor   the instance of {@link Aggregator} for subtraction
-     * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
-     * @param keySerde      key serdes for materializing this stream and the aggregated table,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param valueSerde    value serdes for materializing this stream,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param aggValueSerde value serdes for materializing the aggregated table,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param name          the name of the resulted table
-     * @param <K1>          the key type of this {@link KTable}
-     * @param <V1>          the value type of this {@link KTable}
-     * @param <T>           the value type of the aggregated {@link KTable}
-     */
-    <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
-                                        Aggregator<K1, V1, T> adder,
-                                        Aggregator<K1, V1, T> substractor,
-                                        KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                        Serde<K1> keySerde,
-                                        Serde<V1> valueSerde,
-                                        Serde<T> aggValueSerde,
-                                        String name);
-
-    /**
-     * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}
-     * using default serializers and deserializers.
-     *
-     * @param initializer   the instance of {@link Initializer}
-     * @param adder         the instance of {@link Aggregator} for addition
-     * @param substractor   the instance of {@link Aggregator} for subtraction
-     * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
-     * @param name          the name of the resulted {@link KTable}
-     * @param <K1>          the key type of the aggregated {@link KTable}
-     * @param <V1>          the value type of the aggregated {@link KTable}
-     * @param <T>           the value type of the aggregated {@link KTable}
-     */
-    <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
-                                        Aggregator<K1, V1, T> adder,
-                                        Aggregator<K1, V1, T> substractor,
-                                        KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                        String name);
-
-    /**
-     * Count number of records of this stream by the selected key into a new instance of {@link KTable}.
-     *
-     * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
+     * Group the records of this {@link KTable} using the provided {@link KeyValueMapper}.
+     * 
+     * @param selector      select the grouping key and value to be aggregated
      * @param keySerde      key serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
      * @param valueSerde    value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param name          the name of the resulted table
-     * @param <K1>          the key type of the aggregated {@link KTable}
+     * @param <K1>          the key type of the {@link KGroupedTable}
+     * @param <V1>          the value type of the {@link KGroupedTable}
      */
-    <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector,
-                                Serde<K1> keySerde,
-                                Serde<V> valueSerde,
-                                String name);
+    <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde);
 
     /**
-     * Count number of records of this stream by the selected key into a new instance of {@link KTable}
-     * using default serializers and deserializers.
+     * Group the records of this {@link KTable} using the provided {@link KeyValueMapper} and default serializers and deserializers.
      *
-     * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
-     * @param name          the name of the resulted {@link KTable}
-     * @param <K1>          the key type of the aggregated {@link KTable}
+     * @param selector      select the grouping key and value to be aggregated
+     * @param <K1>          the key type of the {@link KGroupedTable}
+     * @param <V1>          the value type of the {@link KGroupedTable}
      */
-    <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name);
+    <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector);
 
     /**
      * Perform an action on each element of {@link KTable}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
new file mode 100644
index 0000000..d9b0f3d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedTable;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Collections;
+
+/**
+ * The implementation class of {@link KGroupedTable}.
+ * 
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroupedTable<K, V> {
+
+    private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
+
+    private static final String REDUCE_NAME = "KTABLE-REDUCE-";
+
+    private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
+
+    protected final Serde<K> keySerde;
+    protected final Serde<V> valSerde;
+
+    private final String sourceName;
+
+    public KGroupedTableImpl(KStreamBuilder topology,
+                             String name,
+                             String sourceName,
+                             Serde<K> keySerde,
+                             Serde<V> valSerde) {
+        super(topology, name, Collections.singleton(sourceName));
+        this.sourceName = sourceName;
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
+    }
+
+    @Override
+    public <T> KTable<K, T> aggregate(Initializer<T> initializer,
+                                      Aggregator<K, V, T> adder,
+                                      Aggregator<K, V, T> subtractor,
+                                      Serde<T> aggValueSerde,
+                                      String name) {
+
+        String sinkName = topology.newName(KStreamImpl.SINK_NAME);
+        String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
+        String aggregateName = topology.newName(AGGREGATE_NAME);
+
+        String topic = name + REPARTITION_TOPIC_SUFFIX;
+
+        ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer());
+        ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer());
+
+        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor);
+
+        StateStoreSupplier aggregateStore = Stores.create(name)
+                .withKeys(keySerde)
+                .withValues(aggValueSerde)
+                .persistent()
+                .build();
+
+        // send the aggregate key-value pairs to the intermediate topic for partitioning
+        topology.addInternalTopic(topic);
+        topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name);
+
+        // read the intermediate topic
+        topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
+
+        // aggregate the values with the aggregator and local store
+        topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
+        topology.addStateStore(aggregateStore, aggregateName);
+
+        // return the KTable representation with the intermediate topic as the sources
+        return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName));
+    }
+
+    @Override
+    public <T> KTable<K, T> aggregate(Initializer<T> initializer,
+                            Aggregator<K, V, T> adder,
+                            Aggregator<K, V, T> substractor,
+                            String name) {
+
+        return aggregate(initializer, adder, substractor, null, name);
+    }
+
+    @Override
+    public KTable<K, Long> count(String name) {
+        return this.aggregate(
+                new Initializer<Long>() {
+                    @Override
+                    public Long apply() {
+                        return 0L;
+                    }
+                },
+                new Aggregator<K, V, Long>() {
+                    @Override
+                    public Long apply(K aggKey, V value, Long aggregate) {
+                        return aggregate + 1L;
+                    }
+                }, new Aggregator<K, V, Long>() {
+                    @Override
+                    public Long apply(K aggKey, V value, Long aggregate) {
+                        return aggregate - 1L;
+                    }
+                },
+                Serdes.Long(), name);
+    }
+
+    @Override
+    public KTable<K, V> reduce(Reducer<V> adder,
+                               Reducer<V> subtractor,
+                               String name) {
+
+        String sinkName = topology.newName(KStreamImpl.SINK_NAME);
+        String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
+        String reduceName = topology.newName(REDUCE_NAME);
+
+        String topic = name + REPARTITION_TOPIC_SUFFIX;
+
+        ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer());
+        ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer());
+
+        ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
+
+        StateStoreSupplier aggregateStore = Stores.create(name)
+                .withKeys(keySerde)
+                .withValues(valSerde)
+                .persistent()
+                .build();
+
+        // send the aggregate key-value pairs to the intermediate topic for partitioning
+        topology.addInternalTopic(topic);
+        topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name);
+
+        // read the intermediate topic
+        topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
+
+        // aggregate the values with the aggregator and local store
+        topology.addProcessor(reduceName, aggregateSupplier, sourceName);
+        topology.addStateStore(aggregateStore, reduceName);
+
+        // return the KTable representation with the intermediate topic as the sources
+        return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 5c291f5..51d4cb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,45 +18,38 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.state.Stores;
 
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
-import java.util.Collections;
 import java.util.Set;
 
 /**
- * The implementation class of KTable
+ * The implementation class of {@link KTable}.
  * @param <K> the key type
  * @param <S> the source's (parent's) value type
  * @param <V> the value type
  */
 public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
 
-    private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
-
-    private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
-
     private static final String FILTER_NAME = "KTABLE-FILTER-";
 
+    private static final String FOREACH_NAME = "KTABLE-FOREACH-";
+
     public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
 
     public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
@@ -75,16 +68,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
 
-    private static final String REDUCE_NAME = "KTABLE-REDUCE-";
-
     private static final String SELECT_NAME = "KTABLE-SELECT-";
 
     public static final String SOURCE_NAME = "KTABLE-SOURCE-";
 
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
-    private static final String FOREACH_NAME = "KTABLE-FOREACH-";
-
     public final ProcessorSupplier<?, ?> processorSupplier;
 
     private final Serde<K> keySerde;
@@ -172,7 +161,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         }
     }
 
-
     @Override
     public KTable<K, V> through(Serde<K> keySerde,
                                 Serde<V> valSerde,
@@ -319,154 +307,24 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
-                                               Aggregator<K1, V1, T> adder,
-                                               Aggregator<K1, V1, T> subtractor,
-                                               KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                               Serde<K1> keySerde,
-                                               Serde<V1> valueSerde,
-                                               Serde<T> aggValueSerde,
-                                               String name) {
+    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                                  Serde<K1> keySerde,
+                                                  Serde<V1> valueSerde) {
 
         String selectName = topology.newName(SELECT_NAME);
-        String sinkName = topology.newName(KStreamImpl.SINK_NAME);
-        String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
-        String aggregateName = topology.newName(AGGREGATE_NAME);
-
-        String topic = name + REPARTITION_TOPIC_SUFFIX;
-
-        ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerde.serializer());
-        ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueSerde.deserializer());
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
 
-        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor);
-
-        StateStoreSupplier aggregateStore = Stores.create(name)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .build();
-
         // select the aggregate key and values (old and new), it would require parent to send old values
         topology.addProcessor(selectName, selectSupplier, this.name);
         this.enableSendingOldValues();
 
-        // send the aggregate key-value pairs to the intermediate topic for partitioning
-        topology.addInternalTopic(topic);
-        topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, selectName);
-
-        // read the intermediate topic
-        topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
-
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
-        topology.addStateStore(aggregateStore, aggregateName);
-
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, aggregateName, aggregateSupplier, Collections.singleton(sourceName));
+        return new KGroupedTableImpl<>(topology, selectName, this.name, keySerde, valueSerde);
     }
 
     @Override
-    public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
-                                               Aggregator<K1, V1, T> adder,
-                                               Aggregator<K1, V1, T> substractor,
-                                               KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                               String name) {
-
-        return aggregate(initializer, adder, substractor, selector, null, null, null, name);
-    }
-
-    @Override
-    public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector,
-                                       Serde<K1> keySerde,
-                                       Serde<V> valueSerde,
-                                       String name) {
-        return this.aggregate(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<K1, V, Long>() {
-                    @Override
-                    public Long apply(K1 aggKey, V value, Long aggregate) {
-                        return aggregate + 1L;
-                    }
-                }, new Aggregator<K1, V, Long>() {
-                    @Override
-                    public Long apply(K1 aggKey, V value, Long aggregate) {
-                        return aggregate - 1L;
-                    }
-                }, new KeyValueMapper<K, V, KeyValue<K1, V>>() {
-                    @Override
-                    public KeyValue<K1, V> apply(K key, V value) {
-                        return new KeyValue<>(selector.apply(key, value), value);
-                    }
-                },
-                keySerde, valueSerde, Serdes.Long(), name);
-    }
-
-    @Override
-    public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector, String name) {
-        return count(selector, null, null, name);
-    }
-
-    @Override
-    public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
-                                          Reducer<V1> subtractor,
-                                          KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                          Serde<K1> keySerde,
-                                          Serde<V1> valueSerde,
-                                          String name) {
-
-        String selectName = topology.newName(SELECT_NAME);
-        String sinkName = topology.newName(KStreamImpl.SINK_NAME);
-        String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
-        String reduceName = topology.newName(REDUCE_NAME);
-
-        String topic = name + REPARTITION_TOPIC_SUFFIX;
-
-        ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerde.serializer());
-        ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueSerde.deserializer());
-
-        KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
-
-        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
-
-        StateStoreSupplier aggregateStore = Stores.create(name)
-                .withKeys(keySerde)
-                .withValues(valueSerde)
-                .persistent()
-                .build();
-
-        // select the aggregate key and values (old and new), it would require parent to send old values
-        topology.addProcessor(selectName, selectSupplier, this.name);
-        this.enableSendingOldValues();
-
-        // send the aggregate key-value pairs to the intermediate topic for partitioning
-        topology.addInternalTopic(topic);
-        topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, selectName);
-
-        // read the intermediate topic
-        topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
-
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(reduceName, aggregateSupplier, sourceName);
-        topology.addStateStore(aggregateStore, reduceName);
-
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
-    }
-
-    @Override
-    public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
-                                          Reducer<V1> subtractor,
-                                          KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                          String name) {
-
-        return reduce(adder, subtractor, selector, null, null, name);
+    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector) {
+        return this.groupBy(selector, null, null);
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index fc01e5e..1564e95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -72,12 +72,14 @@ public class KTableAggregateTest {
             String topic1 = "topic1";
 
             KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
-            KTable<String, String> table2 = table1.aggregate(new StringInit(), new StringAdd(), new StringRemove(),
-                    new NoOpKeyValueMapper<String, String>(),
-                    stringSerde,
-                    stringSerde,
-                    stringSerde,
-                    "topic1-Canonized");
+            KTable<String, String> table2 = table1.groupBy(new NoOpKeyValueMapper<String, String>(),
+                                                           stringSerde,
+                                                           stringSerde
+            ).aggregate(new StringInit(),
+                        new StringAdd(),
+                        new StringRemove(),
+                        stringSerde,
+                        "topic1-Canonized");
 
             MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
             table2.toStream().process(proc2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c547475/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 0a02824..95e0fbf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -220,15 +220,14 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         // test repartition
         Agg agg = new Agg();
-        cntTable.aggregate(
-                agg.init(),
-                agg.adder(),
-                agg.remover(),
-                agg.selector(),
-                stringSerde,
-                longSerde,
-                longSerde,
-                "cntByCnt"
+        cntTable.groupBy(agg.selector(),
+                         stringSerde,
+                         longSerde
+        ).aggregate(agg.init(),
+                    agg.adder(),
+                    agg.remover(),
+                    longSerde,
+                    "cntByCnt"
         ).to(stringSerde, longSerde, "tagg");
 
         return new KafkaStreams(builder, props);