You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/10/19 21:35:30 UTC
[incubator-heron] branch master updated: Nwang/refactor grouping
(#3040)
This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 72293cf Nwang/refactor grouping (#3040)
72293cf is described below
commit 72293cf86ef0fce4f53e37107ba88f37c746c618
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Fri Oct 19 14:35:26 2018 -0700
Nwang/refactor grouping (#3040)
* Refactor grouping in BoltDeclarer
* clean up
* fix direct grouping
* clean up
---
...mStreamGrouping.java => AllStreamGrouping.java} | 37 ++++------
.../heron/api/grouping/CustomStreamGrouping.java | 28 +++++++-
...reamGrouping.java => DirectStreamGrouping.java} | 38 ++++------
.../heron/api/grouping/FieldsStreamGrouping.java | 54 ++++++++++++++
...reamGrouping.java => GlobalStreamGrouping.java} | 38 ++++------
...StreamGrouping.java => NoneStreamGrouping.java} | 37 ++++------
...eamGrouping.java => ShuffleStreamGrouping.java} | 38 ++++------
...stomStreamGrouping.java => StreamGrouping.java} | 31 +++-----
.../apache/heron/api/topology/BoltDeclarer.java | 82 +++++++---------------
9 files changed, 189 insertions(+), 194 deletions(-)
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/AllStreamGrouping.java
similarity index 50%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/AllStreamGrouping.java
index 627c385..1310000 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/AllStreamGrouping.java
@@ -19,31 +19,22 @@
package org.apache.heron.api.grouping;
-import java.io.Serializable;
-import java.util.List;
+import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.topology.TopologyContext;
+/**
+ * This is the stream grouping strategy that all tuples are transmitted to all instances of a bolt.
+ */
+public class AllStreamGrouping implements StreamGrouping {
+
+ public AllStreamGrouping() { }
-public interface CustomStreamGrouping extends Serializable {
+ public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+ TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- /**
- * Tells the stream grouping at runtime the tasks in the target bolt.
- * This information should be used in chooseTasks to determine the target tasks.
- * <p>
- * It also tells the grouping the metadata on the stream this grouping will be used on.
- */
- void prepare(
- TopologyContext context,
- String component,
- String streamId,
- List<Integer> targetTasks);
+ bldr.setStream(
+ TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+ bldr.setGtype(TopologyAPI.Grouping.ALL);
- /**
- * This function implements a custom stream grouping. It takes in as input
- * the number of tasks in the target bolt in prepare and returns the
- * tasks to send the tuples to.
- *
- * @param values the values to group on
- */
- List<Integer> chooseTasks(List<Object> values);
+ return bldr;
+ }
}
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
index 627c385..57db97d 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
@@ -19,12 +19,18 @@
package org.apache.heron.api.grouping;
-import java.io.Serializable;
import java.util.List;
+import com.google.protobuf.ByteString;
+
+import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.utils.Utils;
-public interface CustomStreamGrouping extends Serializable {
+/**
+ * This is the interface for user defined stream grouping strategies.
+ */
+public interface CustomStreamGrouping extends StreamGrouping {
/**
* Tells the stream grouping at runtime the tasks in the target bolt.
@@ -46,4 +52,22 @@ public interface CustomStreamGrouping extends Serializable {
* @param values the values to group on
*/
List<Integer> chooseTasks(List<Object> values);
+
+ /**
+ * Build InputStream for CustomStreamGrouping implementations.
+ * @param componentName The parent component of this grouping logic
+ * @param streamId The id of the input stream
+ * @return An InputStream builder to be used by BoltDeclarer
+ */
+ default TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+ TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
+
+ bldr.setStream(
+ TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+ bldr.setGtype(TopologyAPI.Grouping.CUSTOM);
+ bldr.setType(TopologyAPI.CustomGroupingObjectType.JAVA_OBJECT);
+ bldr.setCustomGroupingObject(ByteString.copyFrom(Utils.serialize(this)));
+
+ return bldr;
+ }
}
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/DirectStreamGrouping.java
similarity index 50%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/DirectStreamGrouping.java
index 627c385..2b0d8b3 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/DirectStreamGrouping.java
@@ -19,31 +19,23 @@
package org.apache.heron.api.grouping;
-import java.io.Serializable;
-import java.util.List;
+import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.topology.TopologyContext;
+/**
+ * This is the stream grouping strategy that tuples are sent to the instance of choice.
+ */
+public class DirectStreamGrouping implements StreamGrouping {
+
+ public DirectStreamGrouping() { }
-public interface CustomStreamGrouping extends Serializable {
+ public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+ TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- /**
- * Tells the stream grouping at runtime the tasks in the target bolt.
- * This information should be used in chooseTasks to determine the target tasks.
- * <p>
- * It also tells the grouping the metadata on the stream this grouping will be used on.
- */
- void prepare(
- TopologyContext context,
- String component,
- String streamId,
- List<Integer> targetTasks);
+ bldr.setStream(
+ TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+ bldr.setGtype(TopologyAPI.Grouping.DIRECT);
+ bldr.setType(TopologyAPI.CustomGroupingObjectType.JAVA_OBJECT);
- /**
- * This function implements a custom stream grouping. It takes in as input
- * the number of tasks in the target bolt in prepare and returns the
- * tasks to send the tuples to.
- *
- * @param values the values to group on
- */
- List<Integer> chooseTasks(List<Object> values);
+ return bldr;
+ }
}
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/FieldsStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/FieldsStreamGrouping.java
new file mode 100644
index 0000000..5e3ef29
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/api/grouping/FieldsStreamGrouping.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.api.grouping;
+
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.tuple.Fields;
+
+/**
+ * This is the stream grouping strategy that tuples are sent to the particular instance of
+ * the downstream bolt based on the values of a specified fields.
+ */
+public class FieldsStreamGrouping implements StreamGrouping {
+ private Fields fields;
+
+ public FieldsStreamGrouping(Fields fields) {
+ this.fields = fields;
+ }
+
+ public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+ TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
+
+ bldr.setStream(
+ TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+ bldr.setGtype(TopologyAPI.Grouping.FIELDS);
+ TopologyAPI.StreamSchema.Builder gfbldr = TopologyAPI.StreamSchema.newBuilder();
+ for (int i = 0; i < fields.size(); ++i) {
+ TopologyAPI.StreamSchema.KeyType.Builder ktBldr =
+ TopologyAPI.StreamSchema.KeyType.newBuilder();
+ ktBldr.setKey(fields.get(i));
+ ktBldr.setType(TopologyAPI.Type.OBJECT);
+ gfbldr.addKeys(ktBldr);
+ }
+ bldr.setGroupingFields(gfbldr);
+
+ return bldr;
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/GlobalStreamGrouping.java
similarity index 50%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/GlobalStreamGrouping.java
index 627c385..cf119e6 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/GlobalStreamGrouping.java
@@ -19,31 +19,23 @@
package org.apache.heron.api.grouping;
-import java.io.Serializable;
-import java.util.List;
+import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.topology.TopologyContext;
+/**
+ * This is the stream grouping strategy that all tuples are transmitted to a single instance
+ * of a bolt with the lowest task id.
+ */
+public class GlobalStreamGrouping implements StreamGrouping {
+
+ public GlobalStreamGrouping() { }
-public interface CustomStreamGrouping extends Serializable {
+ public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+ TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- /**
- * Tells the stream grouping at runtime the tasks in the target bolt.
- * This information should be used in chooseTasks to determine the target tasks.
- * <p>
- * It also tells the grouping the metadata on the stream this grouping will be used on.
- */
- void prepare(
- TopologyContext context,
- String component,
- String streamId,
- List<Integer> targetTasks);
+ bldr.setStream(
+ TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+ bldr.setGtype(TopologyAPI.Grouping.LOWEST);
- /**
- * This function implements a custom stream grouping. It takes in as input
- * the number of tasks in the target bolt in prepare and returns the
- * tasks to send the tuples to.
- *
- * @param values the values to group on
- */
- List<Integer> chooseTasks(List<Object> values);
+ return bldr;
+ }
}
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/NoneStreamGrouping.java
similarity index 50%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/NoneStreamGrouping.java
index 627c385..9e7c5a2 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/NoneStreamGrouping.java
@@ -19,31 +19,22 @@
package org.apache.heron.api.grouping;
-import java.io.Serializable;
-import java.util.List;
+import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.topology.TopologyContext;
+/**
+ * This is the stream grouping strategy that is the same as shuffle grouping.
+ */
+public class NoneStreamGrouping implements StreamGrouping {
+
+ public NoneStreamGrouping() { }
-public interface CustomStreamGrouping extends Serializable {
+ public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+ TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- /**
- * Tells the stream grouping at runtime the tasks in the target bolt.
- * This information should be used in chooseTasks to determine the target tasks.
- * <p>
- * It also tells the grouping the metadata on the stream this grouping will be used on.
- */
- void prepare(
- TopologyContext context,
- String component,
- String streamId,
- List<Integer> targetTasks);
+ bldr.setStream(
+ TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+ bldr.setGtype(TopologyAPI.Grouping.NONE);
- /**
- * This function implements a custom stream grouping. It takes in as input
- * the number of tasks in the target bolt in prepare and returns the
- * tasks to send the tuples to.
- *
- * @param values the values to group on
- */
- List<Integer> chooseTasks(List<Object> values);
+ return bldr;
+ }
}
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/ShuffleStreamGrouping.java
similarity index 50%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/ShuffleStreamGrouping.java
index 627c385..eb06336 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/ShuffleStreamGrouping.java
@@ -19,31 +19,23 @@
package org.apache.heron.api.grouping;
-import java.io.Serializable;
-import java.util.List;
+import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.topology.TopologyContext;
+/**
+ * This is the stream grouping strategy that tuples are randomly distributed to instances of
+ * the bolt.
+ */
+public class ShuffleStreamGrouping implements StreamGrouping {
+
+ public ShuffleStreamGrouping() { }
-public interface CustomStreamGrouping extends Serializable {
+ public TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId) {
+ TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- /**
- * Tells the stream grouping at runtime the tasks in the target bolt.
- * This information should be used in chooseTasks to determine the target tasks.
- * <p>
- * It also tells the grouping the metadata on the stream this grouping will be used on.
- */
- void prepare(
- TopologyContext context,
- String component,
- String streamId,
- List<Integer> targetTasks);
+ bldr.setStream(
+ TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
+ bldr.setGtype(TopologyAPI.Grouping.SHUFFLE);
- /**
- * This function implements a custom stream grouping. It takes in as input
- * the number of tasks in the target bolt in prepare and returns the
- * tasks to send the tuples to.
- *
- * @param values the values to group on
- */
- List<Integer> chooseTasks(List<Object> values);
+ return bldr;
+ }
}
diff --git a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java b/heron/api/src/java/org/apache/heron/api/grouping/StreamGrouping.java
similarity index 52%
copy from heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
copy to heron/api/src/java/org/apache/heron/api/grouping/StreamGrouping.java
index 627c385..c3ae1fa 100644
--- a/heron/api/src/java/org/apache/heron/api/grouping/CustomStreamGrouping.java
+++ b/heron/api/src/java/org/apache/heron/api/grouping/StreamGrouping.java
@@ -20,30 +20,19 @@
package org.apache.heron.api.grouping;
import java.io.Serializable;
-import java.util.List;
-import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.generated.TopologyAPI;
-public interface CustomStreamGrouping extends Serializable {
-
- /**
- * Tells the stream grouping at runtime the tasks in the target bolt.
- * This information should be used in chooseTasks to determine the target tasks.
- * <p>
- * It also tells the grouping the metadata on the stream this grouping will be used on.
- */
- void prepare(
- TopologyContext context,
- String component,
- String streamId,
- List<Integer> targetTasks);
+/**
+ * This is the interface of stream grouping at runtime the tasks in the target bolt.
+ */
+public interface StreamGrouping extends Serializable {
/**
- * This function implements a custom stream grouping. It takes in as input
- * the number of tasks in the target bolt in prepare and returns the
- * tasks to send the tuples to.
- *
- * @param values the values to group on
+ * Create an InputStream Builder object with the corresponding grouping logic.
+ * @param componentName The parent component of this grouping logic
+ * @param streamId The id of the input stream
+ * @return An InputStream builder to be used by BoltDeclarer
*/
- List<Integer> chooseTasks(List<Object> values);
+ TopologyAPI.InputStream.Builder buildStream(String componentName, String streamId);
}
diff --git a/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java b/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
index 1956e92..9cddddf 100644
--- a/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
+++ b/heron/api/src/java/org/apache/heron/api/topology/BoltDeclarer.java
@@ -23,11 +23,16 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import com.google.protobuf.ByteString;
-
import org.apache.heron.api.bolt.IRichBolt;
import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.grouping.AllStreamGrouping;
import org.apache.heron.api.grouping.CustomStreamGrouping;
+import org.apache.heron.api.grouping.DirectStreamGrouping;
+import org.apache.heron.api.grouping.FieldsStreamGrouping;
+import org.apache.heron.api.grouping.GlobalStreamGrouping;
+import org.apache.heron.api.grouping.NoneStreamGrouping;
+import org.apache.heron.api.grouping.ShuffleStreamGrouping;
+import org.apache.heron.api.grouping.StreamGrouping;
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.api.utils.Utils;
@@ -77,20 +82,7 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
}
public BoltDeclarer fieldsGrouping(String componentName, String streamId, Fields fields) {
- TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- bldr.setStream(
- TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
- bldr.setGtype(TopologyAPI.Grouping.FIELDS);
- TopologyAPI.StreamSchema.Builder gfbldr = TopologyAPI.StreamSchema.newBuilder();
- for (int i = 0; i < fields.size(); ++i) {
- TopologyAPI.StreamSchema.KeyType.Builder ktBldr =
- TopologyAPI.StreamSchema.KeyType.newBuilder();
- ktBldr.setKey(fields.get(i));
- ktBldr.setType(TopologyAPI.Type.OBJECT);
- gfbldr.addKeys(ktBldr);
- }
- bldr.setGroupingFields(gfbldr);
- return grouping(bldr);
+ return grouping(componentName, streamId, new FieldsStreamGrouping(fields));
}
public BoltDeclarer globalGrouping(String componentName) {
@@ -98,11 +90,7 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
}
public BoltDeclarer globalGrouping(String componentName, String streamId) {
- TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- bldr.setStream(
- TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
- bldr.setGtype(TopologyAPI.Grouping.LOWEST);
- return grouping(bldr);
+ return grouping(componentName, streamId, new GlobalStreamGrouping());
}
public BoltDeclarer shuffleGrouping(String componentName) {
@@ -110,11 +98,7 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
}
public BoltDeclarer shuffleGrouping(String componentName, String streamId) {
- TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- bldr.setStream(
- TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
- bldr.setGtype(TopologyAPI.Grouping.SHUFFLE);
- return grouping(bldr);
+ return grouping(componentName, streamId, new ShuffleStreamGrouping());
}
public BoltDeclarer localOrShuffleGrouping(String componentName) {
@@ -132,11 +116,7 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
}
public BoltDeclarer noneGrouping(String componentName, String streamId) {
- TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- bldr.setStream(
- TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
- bldr.setGtype(TopologyAPI.Grouping.NONE);
- return grouping(bldr);
+ return grouping(componentName, streamId, new NoneStreamGrouping());
}
public BoltDeclarer allGrouping(String componentName) {
@@ -144,11 +124,7 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
}
public BoltDeclarer allGrouping(String componentName, String streamId) {
- TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- bldr.setStream(
- TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
- bldr.setGtype(TopologyAPI.Grouping.ALL);
- return grouping(bldr);
+ return grouping(componentName, streamId, new AllStreamGrouping());
}
public BoltDeclarer directGrouping(String componentName) {
@@ -156,33 +132,27 @@ public class BoltDeclarer extends BaseComponentDeclarer<BoltDeclarer> {
}
public BoltDeclarer directGrouping(String componentName, String streamId) {
- TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- bldr.setStream(
- TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
- bldr.setGtype(TopologyAPI.Grouping.DIRECT);
- bldr.setType(TopologyAPI.CustomGroupingObjectType.JAVA_OBJECT);
- return grouping(bldr);
+ return grouping(componentName, streamId, new DirectStreamGrouping());
}
- public BoltDeclarer customGrouping(String componentName, CustomStreamGrouping grouping) {
- return customGrouping(componentName, Utils.DEFAULT_STREAM_ID, grouping);
+ public BoltDeclarer customGrouping(String componentName, CustomStreamGrouping grouper) {
+ return customGrouping(componentName, Utils.DEFAULT_STREAM_ID, grouper);
}
public BoltDeclarer customGrouping(
String componentName,
String streamId,
- CustomStreamGrouping grouping) {
- TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder();
- bldr.setStream(
- TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName));
- bldr.setGtype(TopologyAPI.Grouping.CUSTOM);
- bldr.setType(TopologyAPI.CustomGroupingObjectType.JAVA_OBJECT);
- bldr.setCustomGroupingObject(ByteString.copyFrom(Utils.serialize(grouping)));
- return grouping(bldr);
- }
-
- private BoltDeclarer grouping(TopologyAPI.InputStream.Builder stream) {
- inputs.add(stream);
+ CustomStreamGrouping grouper) {
+
+ return grouping(componentName, streamId, grouper);
+ }
+
+ public BoltDeclarer grouping(
+ String componentName,
+ String streamId,
+ StreamGrouping grouper) {
+
+ inputs.add(grouper.buildStream(componentName, streamId));
return this;
}
}