You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:42 UTC
[42/53] [abbrv] beam git commit: jstorm-runner: Support multiple
copies of Flatten
jstorm-runner: Support multiple copies of Flatten
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1178f9fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1178f9fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1178f9fb
Branch: refs/heads/jstorm-runner
Commit: 1178f9fb957c7e6cf1b277696ff63dc0e29a6d5e
Parents: 52913b7
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Thu Jul 20 20:04:24 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:03:00 2017 +0800
----------------------------------------------------------------------
.../runners/jstorm/translation/FlattenExecutor.java | 12 ++++++++++--
.../jstorm/translation/FlattenTranslator.java | 15 +++++++++++++--
2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1178f9fb/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
index a64f494..928fa24 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.jstorm.translation;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Map;
+
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -32,8 +34,11 @@ class FlattenExecutor<InputT> implements Executor {
private TupleTag mainOutputTag;
private ExecutorContext context;
private ExecutorsBolt executorsBolt;
+ private final Map<TupleTag, Integer> tagToCopyNum;
- public FlattenExecutor(String description, TupleTag mainTupleTag) {
+ public FlattenExecutor(String description, TupleTag mainTupleTag,
+ Map<TupleTag, Integer> tagToCopyNum) {
+ this.tagToCopyNum = checkNotNull(tagToCopyNum, "tagToCopyNum");
this.description = checkNotNull(description, "description");
this.mainOutputTag = mainTupleTag;
}
@@ -46,7 +51,10 @@ class FlattenExecutor<InputT> implements Executor {
@Override
public void process(TupleTag tag, WindowedValue elem) {
- executorsBolt.processExecutorElem(mainOutputTag, elem);
+ int copyNum = tagToCopyNum.get(tag);
+ for (int i = 0; i < copyNum; i++) {
+ executorsBolt.processExecutorElem(mainOutputTag, elem);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/1178f9fb/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
index e104ad8..b96bc56 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -48,9 +48,19 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti
// Since a new tag is created in PCollectionList, retrieve the real tag here.
Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
+ Map<TupleTag<?>, Integer> tagToCopyNum = Maps.newHashMap();
for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
PCollection<V> pc = (PCollection<V>) entry.getValue();
- inputs.putAll(pc.expand());
+ //inputs.putAll(pc.expand());
+ for (Map.Entry<TupleTag<?>, PValue> entry1 : pc.expand().entrySet()) {
+ if (inputs.containsKey(entry1.getKey())) {
+ int copyNum = tagToCopyNum.get(entry1.getKey());
+ tagToCopyNum.put(entry1.getKey(), ++copyNum);
+ } else {
+ inputs.put(entry1.getKey(), entry1.getValue());
+ tagToCopyNum.put(entry1.getKey(), 1);
+ }
+ }
}
String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
@@ -67,7 +77,8 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti
context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
} else {
- FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
+ FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag(),
+ tagToCopyNum);
context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
}
}