You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:42 UTC

[42/53] [abbrv] beam git commit: jstorm-runner: Support multiple copies of Flatten

jstorm-runner: Support multiple copies of Flatten


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1178f9fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1178f9fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1178f9fb

Branch: refs/heads/jstorm-runner
Commit: 1178f9fb957c7e6cf1b277696ff63dc0e29a6d5e
Parents: 52913b7
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Thu Jul 20 20:04:24 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:03:00 2017 +0800

----------------------------------------------------------------------
 .../runners/jstorm/translation/FlattenExecutor.java  | 12 ++++++++++--
 .../jstorm/translation/FlattenTranslator.java        | 15 +++++++++++++--
 2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1178f9fb/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
index a64f494..928fa24 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.jstorm.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Map;
+
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -32,8 +34,11 @@ class FlattenExecutor<InputT> implements Executor {
   private TupleTag mainOutputTag;
   private ExecutorContext context;
   private ExecutorsBolt executorsBolt;
+  private final Map<TupleTag, Integer> tagToCopyNum;
 
-  public FlattenExecutor(String description, TupleTag mainTupleTag) {
+  public FlattenExecutor(String description, TupleTag mainTupleTag,
+                         Map<TupleTag, Integer> tagToCopyNum) {
+    this.tagToCopyNum = checkNotNull(tagToCopyNum, "tagToCopyNum");
     this.description = checkNotNull(description, "description");
     this.mainOutputTag = mainTupleTag;
   }
@@ -46,7 +51,10 @@ class FlattenExecutor<InputT> implements Executor {
 
   @Override
   public void process(TupleTag tag, WindowedValue elem) {
-    executorsBolt.processExecutorElem(mainOutputTag, elem);
+    int copyNum = tagToCopyNum.get(tag);
+    for (int i = 0; i < copyNum; i++) {
+      executorsBolt.processExecutorElem(mainOutputTag, elem);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1178f9fb/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
index e104ad8..b96bc56 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -48,9 +48,19 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti
 
     // Since a new tag is created in PCollectionList, retrieve the real tag here.
     Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
+    Map<TupleTag<?>, Integer> tagToCopyNum = Maps.newHashMap();
     for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
       PCollection<V> pc = (PCollection<V>) entry.getValue();
-      inputs.putAll(pc.expand());
+      //inputs.putAll(pc.expand());
+      for (Map.Entry<TupleTag<?>, PValue> entry1 : pc.expand().entrySet()) {
+        if (inputs.containsKey(entry1.getKey())) {
+          int copyNum = tagToCopyNum.get(entry1.getKey());
+          tagToCopyNum.put(entry1.getKey(), ++copyNum);
+        } else {
+          inputs.put(entry1.getKey(), entry1.getValue());
+          tagToCopyNum.put(entry1.getKey(), 1);
+        }
+      }
     }
     String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
 
@@ -67,7 +77,8 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti
       context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
 
     } else {
-      FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
+      FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag(),
+          tagToCopyNum);
       context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
     }
   }