You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/10/19 21:35:30 UTC

[incubator-heron] branch master updated: Nwang/refactor grouping (#3040)

This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 72293cf  Nwang/refactor grouping (#3040)
72293cf is described below

commit 72293cf86ef0fce4f53e37107ba88f37c746c618
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Fri Oct 19 14:35:26 2018 -0700

    Nwang/refactor grouping (#3040)
    
    * Refactor grouping in BoltDeclarer
    
    * clean up
    
    * fix direct grouping
    
    * clean up
---
 ...mStreamGrouping.java => AllStreamGrouping.java} | 37 ++++------
 .../heron/api/grouping/CustomStreamGrouping.java   | 28 +++++++-
 ...reamGrouping.java => DirectStreamGrouping.java} | 38 ++++------
 .../heron/api/grouping/FieldsStreamGrouping.java   | 54 ++++++++++++++
 ...reamGrouping.java => GlobalStreamGrouping.java} | 38 ++++------
 ...StreamGrouping.java => NoneStreamGrouping.java} | 37 ++++------
 ...eamGrouping.java => ShuffleStreamGrouping.java} | 38 ++++------
 ...stomStreamGrouping.java => StreamGrouping.java} | 31 +++-----
 .../apache/heron/api/topology/BoltDeclarer.java    | 82 +++++++---------------
 9 files changed, 189 insertions(+), 194 deletions(-)

diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/AllStreamGrouping.java
similarity index 50%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/AllStreamGrouping.java
index 627c385..1310000 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/AllStreamGrouping.java
@@ -19,31 +19,22 @@
 
 package org.apache.heron.api.grouping;
 
-import java.io.Serializable;
-import java.util.List;
+import org.apache.heron.api.generated.TopologyAPI;
 
-import org.apache.heron.api.topology.TopologyContext;
+/**
+ * This is the stream grouping strategy that all tuples are transmitted to all instances of a bolt.
+ */
+public class AllStreamGrouping implements StreamGrouping {
+
+  public AllStreamGrouping() { }
 
-public interface CustomStreamGrouping extends Serializable {
+  public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
 
-  /**
-   * Tells the stream grouping at runtime the tasks in the target bolt.
-   * This information should be used in chooseTasks to determine the target tasks.
-   * <p>
-   * It also tells the grouping the metadata on the stream this grouping will be used on.
-   */
-  void prepare(
-      TopologyContext context,
-      String component,
-      String streamId,
-      List<Integer> targetTasks);
+    bldr.setStream(
+        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+    bldr.setGtype(TopologyAPI.Grouping.ALL);
 
-  /**
-   * This function implements a custom stream grouping. It takes in as input
-   * the number of tasks in the target bolt in prepare and returns the
-   * tasks to send the tuples to.
-   *
-   * @param values the values to group on
-   */
-  List<Integer> chooseTasks(List<Object> values);
+    return bldr;
+  }
 }
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
index 627c385..57db97d 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
@@ -19,12 +19,18 @@
 
 package org.apache.heron.api.grouping;
 
-import java.io.Serializable;
 import java.util.List;
 
+import com.google.protobuf.ByteString;
+
+import org.apache.heron.api.generated.TopologyAPI;
 import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.utils.Utils;
 
-public interface CustomStreamGrouping extends Serializable {
+/**
+ * This is the interface for user defined stream grouping strategies.
+ */
+public interface CustomStreamGrouping extends StreamGrouping {
 
   /**
    * Tells the stream grouping at runtime the tasks in the target bolt.
@@ -46,4 +52,22 @@ public interface CustomStreamGrouping extends Serializable {
    * @param values the values to group on
    */
   List<Integer> chooseTasks(List<Object> values);
+
+  /**
+   * Build InputStream for CustomStreamGrouping implementations.
+   * @param componentName The parent component of this grouping logic
+   * @param streamId The id of the input stream
+   * @return An InputStream builder to be used by BoltDeclarer
+   */
+  default TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
+
+    bldr.setStream(
+        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+    bldr.setGtype(TopologyAPI.Grouping.CUSTOM);
+    bldr.setType(TopologyAPI.CustomGroupingObjectType.JAVA_OBJECT);
+    bldr.setCustomGroupingObject(ByteString.copyFrom(Utils.serialize(this)));
+
+    return bldr;
+  }
 }
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/DirectStreamGrouping.java
similarity index 50%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/DirectStreamGrouping.java
index 627c385..2b0d8b3 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/DirectStreamGrouping.java
@@ -19,31 +19,23 @@
 
 package org.apache.heron.api.grouping;
 
-import java.io.Serializable;
-import java.util.List;
+import org.apache.heron.api.generated.TopologyAPI;
 
-import org.apache.heron.api.topology.TopologyContext;
+/**
+ * This is the stream grouping strategy that tuples are sent to the instance of choice.
+ */
+public class DirectStreamGrouping implements StreamGrouping {
+
+  public DirectStreamGrouping() { }
 
-public interface CustomStreamGrouping extends Serializable {
+  public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
 
-  /**
-   * Tells the stream grouping at runtime the tasks in the target bolt.
-   * This information should be used in chooseTasks to determine the target tasks.
-   * <p>
-   * It also tells the grouping the metadata on the stream this grouping will be used on.
-   */
-  void prepare(
-      TopologyContext context,
-      String component,
-      String streamId,
-      List<Integer> targetTasks);
+    bldr.setStream(
+        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+    bldr.setGtype(TopologyAPI.Grouping.DIRECT);
+    bldr.setType(TopologyAPI.CustomGroupingObjectType.JAVA_OBJECT);
 
-  /**
-   * This function implements a custom stream grouping. It takes in as input
-   * the number of tasks in the target bolt in prepare and returns the
-   * tasks to send the tuples to.
-   *
-   * @param values the values to group on
-   */
-  List<Integer> chooseTasks(List<Object> values);
+    return bldr;
+  }
 }
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/FieldsStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/FieldsStreamGrouping.java
new file mode 100644
index 0000000..5e3ef29
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/api/grouping/FieldsStreamGrouping.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.api.grouping;
+
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.tuple.Fields;
+
+/**
+ * This is the stream grouping strategy that tuples are sent to the particular instance of
+ * the downstream bolt based on the values of a specified fields.
+ */
+public class FieldsStreamGrouping implements StreamGrouping {
+  private Fields fields;
+
+  public FieldsStreamGrouping(Fields fields) {
+    this.fields = fields;
+  }
+
+  public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
+
+    bldr.setStream(
+        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+    bldr.setGtype(TopologyAPI.Grouping.FIELDS);
+    TopologyAPI.StreamSchema.Builder gfbldr = TopologyAPI.StreamSchema.newBuilder();
+    for (int i = 0; i < fields.size(); ++i) {
+      TopologyAPI.StreamSchema.KeyType.Builder ktBldr =
+          TopologyAPI.StreamSchema.KeyType.newBuilder();
+      ktBldr.setKey(fields.get(i));
+      ktBldr.setType(TopologyAPI.Type.OBJECT);
+      gfbldr.addKeys(ktBldr);
+    }
+    bldr.setGroupingFields(gfbldr);
+
+    return bldr;
+  }
+}
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/GlobalStreamGrouping.java
similarity index 50%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/GlobalStreamGrouping.java
index 627c385..cf119e6 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/GlobalStreamGrouping.java
@@ -19,31 +19,23 @@
 
 package org.apache.heron.api.grouping;
 
-import java.io.Serializable;
-import java.util.List;
+import org.apache.heron.api.generated.TopologyAPI;
 
-import org.apache.heron.api.topology.TopologyContext;
+/**
+ * This is the stream grouping strategy that all tuples are transmitted to a single instance
+ * of a bolt with the lowest task id.
+ */
+public class GlobalStreamGrouping implements StreamGrouping {
+
+  public GlobalStreamGrouping() { }
 
-public interface CustomStreamGrouping extends Serializable {
+  public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
 
-  /**
-   * Tells the stream grouping at runtime the tasks in the target bolt.
-   * This information should be used in chooseTasks to determine the target tasks.
-   * <p>
-   * It also tells the grouping the metadata on the stream this grouping will be used on.
-   */
-  void prepare(
-      TopologyContext context,
-      String component,
-      String streamId,
-      List<Integer> targetTasks);
+    bldr.setStream(
+        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+    bldr.setGtype(TopologyAPI.Grouping.LOWEST);
 
-  /**
-   * This function implements a custom stream grouping. It takes in as input
-   * the number of tasks in the target bolt in prepare and returns the
-   * tasks to send the tuples to.
-   *
-   * @param values the values to group on
-   */
-  List<Integer> chooseTasks(List<Object> values);
+    return bldr;
+  }
 }
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/NoneStreamGrouping.java
similarity index 50%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/NoneStreamGrouping.java
index 627c385..9e7c5a2 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/NoneStreamGrouping.java
@@ -19,31 +19,22 @@
 
 package org.apache.heron.api.grouping;
 
-import java.io.Serializable;
-import java.util.List;
+import org.apache.heron.api.generated.TopologyAPI;
 
-import org.apache.heron.api.topology.TopologyContext;
+/**
+ * This is the stream grouping strategy that is the same as shuffle grouping.
+ */
+public class NoneStreamGrouping implements StreamGrouping {
+
+  public NoneStreamGrouping() { }
 
-public interface CustomStreamGrouping extends Serializable {
+  public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
 
-  /**
-   * Tells the stream grouping at runtime the tasks in the target bolt.
-   * This information should be used in chooseTasks to determine the target tasks.
-   * <p>
-   * It also tells the grouping the metadata on the stream this grouping will be used on.
-   */
-  void prepare(
-      TopologyContext context,
-      String component,
-      String streamId,
-      List<Integer> targetTasks);
+    bldr.setStream(
+        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+    bldr.setGtype(TopologyAPI.Grouping.NONE);
 
-  /**
-   * This function implements a custom stream grouping. It takes in as input
-   * the number of tasks in the target bolt in prepare and returns the
-   * tasks to send the tuples to.
-   *
-   * @param values the values to group on
-   */
-  List<Integer> chooseTasks(List<Object> values);
+    return bldr;
+  }
 }
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/ShuffleStreamGrouping.java
similarity index 50%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/ShuffleStreamGrouping.java
index 627c385..eb06336 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/ShuffleStreamGrouping.java
@@ -19,31 +19,23 @@
 
 package org.apache.heron.api.grouping;
 
-import java.io.Serializable;
-import java.util.List;
+import org.apache.heron.api.generated.TopologyAPI;
 
-import org.apache.heron.api.topology.TopologyContext;
+/**
+ * This is the stream grouping strategy that tuples are randomly distributed to instances of
+ * the bolt.
+ */
+public class ShuffleStreamGrouping implements StreamGrouping {
+
+  public ShuffleStreamGrouping() { }
 
-public interface CustomStreamGrouping extends Serializable {
+  public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
 
-  /**
-   * Tells the stream grouping at runtime the tasks in the target bolt.
-   * This information should be used in chooseTasks to determine the target tasks.
-   * <p>
-   * It also tells the grouping the metadata on the stream this grouping will be used on.
-   */
-  void prepare(
-      TopologyContext context,
-      String component,
-      String streamId,
-      List<Integer> targetTasks);
+    bldr.setStream(
+        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+    bldr.setGtype(TopologyAPI.Grouping.SHUFFLE);
 
-  /**
-   * This function implements a custom stream grouping. It takes in as input
-   * the number of tasks in the target bolt in prepare and returns the
-   * tasks to send the tuples to.
-   *
-   * @param values the values to group on
-   */
-  List<Integer> chooseTasks(List<Object> values);
+    return bldr;
+  }
 }
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/StreamGrouping.java
similarity index 52%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/StreamGrouping.java
index 627c385..c3ae1fa 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/StreamGrouping.java
@@ -20,30 +20,19 @@
 package org.apache.heron.api.grouping;
 
 import java.io.Serializable;
-import java.util.List;
 
-import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.generated.TopologyAPI;
 
-public interface CustomStreamGrouping extends Serializable {
-
-  /**
-   * Tells the stream grouping at runtime the tasks in the target bolt.
-   * This information should be used in chooseTasks to determine the target tasks.
-   * <p>
-   * It also tells the grouping the metadata on the stream this grouping will be used on.
-   */
-  void prepare(
-      TopologyContext context,
-      String component,
-      String streamId,
-      List<Integer> targetTasks);
+/**
+ * This is the interface of stream grouping at runtime the tasks in the target bolt.
+ */
+public interface StreamGrouping extends Serializable {
 
   /**
-   * This function implements a custom stream grouping. It takes in as input
-   * the number of tasks in the target bolt in prepare and returns the
-   * tasks to send the tuples to.
-   *
-   * @param values the values to group on
+   * Create an InputStream Builder object with the corresponding grouping logic.
+   * @param componentName The parent component of this grouping logic
+   * @param streamId The id of the input stream
+   * @return An InputStream builder to be used by BoltDeclarer
    */
-  List<Integer> chooseTasks(List<Object> values);
+  TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId);
 }
diff --git a/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java b/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
index 1956e92..9cddddf 100644
--- a/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
+++ b/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
@@ -23,11 +23,16 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import com.google.protobuf.ByteString;
-
 import org.apache.heron.api.bolt.IRichBolt;
 import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.grouping.AllStreamGrouping;
 import org.apache.heron.api.grouping.CustomStreamGrouping;
+import org.apache.heron.api.grouping.DirectStreamGrouping;
+import org.apache.heron.api.grouping.FieldsStreamGrouping;
+import org.apache.heron.api.grouping.GlobalStreamGrouping;
+import org.apache.heron.api.grouping.NoneStreamGrouping;
+import org.apache.heron.api.grouping.ShuffleStreamGrouping;
+import org.apache.heron.api.grouping.StreamGrouping;
 import org.apache.heron.api.tuple.Fields;
 import org.apache.heron.api.utils.Utils;
 
@@ -77,20 +82,7 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
   }
 
   public BoltDeclarer fieldsGrouping(String componentName, String streamId, Fields fields) {
-    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
-    bldr.setStream(
-        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
-    bldr.setGtype(TopologyAPI.Grouping.FIELDS);
-    TopologyAPI.StreamSchema.Builder gfbldr = TopologyAPI.StreamSchema.newBuilder();
-    for (int i = 0; i < fields.size(); ++i) {
-      TopologyAPI.StreamSchema.KeyType.Builder ktBldr =
-          TopologyAPI.StreamSchema.KeyType.newBuilder();
-      ktBldr.setKey(fields.get(i));
-      ktBldr.setType(TopologyAPI.Type.OBJECT);
-      gfbldr.addKeys(ktBldr);
-    }
-    bldr.setGroupingFields(gfbldr);
-    return grouping(bldr);
+    return grouping(componentName, streamId, new FieldsStreamGrouping(fields));
   }
 
   public BoltDeclarer globalGrouping(String componentName) {
@@ -98,11 +90,7 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
   }
 
   public BoltDeclarer globalGrouping(String componentName, String streamId) {
-    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
-    bldr.setStream(
-        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
-    bldr.setGtype(TopologyAPI.Grouping.LOWEST);
-    return grouping(bldr);
+    return grouping(componentName, streamId, new GlobalStreamGrouping());
   }
 
   public BoltDeclarer shuffleGrouping(String componentName) {
@@ -110,11 +98,7 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
   }
 
   public BoltDeclarer shuffleGrouping(String componentName, String streamId) {
-    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
-    bldr.setStream(
-        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
-    bldr.setGtype(TopologyAPI.Grouping.SHUFFLE);
-    return grouping(bldr);
+    return grouping(componentName, streamId, new ShuffleStreamGrouping());
   }
 
   public BoltDeclarer localOrShuffleGrouping(String componentName) {
@@ -132,11 +116,7 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
   }
 
   public BoltDeclarer noneGrouping(String componentName, String streamId) {
-    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
-    bldr.setStream(
-        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
-    bldr.setGtype(TopologyAPI.Grouping.NONE);
-    return grouping(bldr);
+    return grouping(componentName, streamId, new NoneStreamGrouping());
   }
 
   public BoltDeclarer allGrouping(String componentName) {
@@ -144,11 +124,7 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
   }
 
   public BoltDeclarer allGrouping(String componentName, String streamId) {
-    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
-    bldr.setStream(
-        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
-    bldr.setGtype(TopologyAPI.Grouping.ALL);
-    return grouping(bldr);
+    return grouping(componentName, streamId, new AllStreamGrouping());
   }
 
   public BoltDeclarer directGrouping(String componentName) {
@@ -156,33 +132,27 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
   }
 
   public BoltDeclarer directGrouping(String componentName, String streamId) {
-    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
-    bldr.setStream(
-        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
-    bldr.setGtype(TopologyAPI.Grouping.DIRECT);
-    bldr.setType(TopologyAPI.CustomGroupingObjectType.JAVA_OBJECT);
-    return grouping(bldr);
+    return grouping(componentName, streamId, new DirectStreamGrouping());
   }
 
-  public BoltDeclarer customGrouping(String componentName, CustomStreamGrouping grouping) {
-    return customGrouping(componentName, Utils.DEFAULT_STREAM_ID, grouping);
+  public BoltDeclarer customGrouping(String componentName, CustomStreamGrouping grouper) {
+    return customGrouping(componentName, Utils.DEFAULT_STREAM_ID, grouper);
   }
 
   public BoltDeclarer customGrouping(
       String componentName,
       String streamId,
-      CustomStreamGrouping grouping) {
-    TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
-    bldr.setStream(
-        TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
-    bldr.setGtype(TopologyAPI.Grouping.CUSTOM);
-    bldr.setType(TopologyAPI.CustomGroupingObjectType.JAVA_OBJECT);
-    bldr.setCustomGroupingObject(ByteString.copyFrom(Utils.serialize(grouping)));
-    return grouping(bldr);
-  }
-
-  private BoltDeclarer grouping(TopologyAPI.InputStream.Builder stream) {
-    inputs.add(stream);
+      CustomStreamGrouping grouper) {
+
+    return grouping(componentName, streamId, grouper);
+  }
+
+  public BoltDeclarer grouping(
+      String componentName,
+      String streamId,
+      StreamGrouping grouper) {
+
+    inputs.add(grouper.buildStream(componentName, streamId));
     return this;
   }
 }