You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:28 UTC

[28/53] [abbrv] beam git commit: jstorm-runner: 1. Use the TupleTag of "PCollection expand" when getting input tags and output tags 2. Check the exception record when asserting of unit test

jstorm-runner:
1. Use the TupleTag of "PCollection expand" when getting input tags and output tags
2. Check the exception record when asserting of unit test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30f3eda6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30f3eda6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30f3eda6

Branch: refs/heads/jstorm-runner
Commit: 30f3eda64c68cea092c42b7acc1dfd98eb8cbbd0
Parents: df154de
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Jul 17 15:55:01 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:58 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/TestJStormRunner.java       |  9 +++++++++
 .../runners/jstorm/translation/DoFnExecutor.java    |  6 +++++-
 .../jstorm/translation/MultiOutputDoFnExecutor.java |  6 +-----
 .../jstorm/translation/ParDoBoundTranslator.java    |  5 +++--
 .../jstorm/translation/TranslationContext.java      | 16 +++++++++-------
 5 files changed, 27 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index b1b0379..a117675 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -8,6 +8,7 @@ import com.alibaba.jstorm.metric.AsmWindow;
 import com.alibaba.jstorm.metric.JStormMetrics;
 import com.alibaba.jstorm.metric.MetaType;
 import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.task.error.TaskReportErrorAndDie;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
@@ -56,14 +57,21 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
       if (numberOfAssertions == 0) {
         // If assert number is zero, wait 5 sec
         JStormUtils.sleepMs(5000);
+        Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
+        if (taskExceptionRec != null) {
+          throw new RuntimeException(taskExceptionRec.getCause());
+        }
         return result;
       } else {
         for (int i = 0; i < 40; ++i) {
           Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions);
+          Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
           if (success.isPresent() && success.get()) {
             return result;
           } else if (success.isPresent() && !success.get()) {
             throw new AssertionError("Failed assertion checks.");
+          } else if (taskExceptionRec != null) {
+            throw new RuntimeException(taskExceptionRec.getCause());
           } else {
             JStormUtils.sleepMs(500);
           }
@@ -74,6 +82,7 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
     } finally {
       clearPAssertCount();
       cancel(result);
+      TaskReportErrorAndDie.setExceptionRecord(null);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
index fdd9af6..6baa944 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -203,8 +203,12 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
         tag, mainInputTag, sideInputs, elem.getValue()));
     if (mainInputTag.equals(tag)) {
       processMainInput(elem);
-    } else {
+    } else if (sideInputTagToView.containsKey(tag)) {
       processSideInput(tag, elem);
+    } else {
+      LOG.warn("Discard unexpected elem={} from tag={}", elem.getValue(), tag);
+      LOG.warn("Current mainInputTag={}, sideInputTags={}",
+              mainInputTag, sideInputTagToView.keySet());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
index 49b0f85..138a5dc 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
@@ -46,11 +46,7 @@ class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, Outp
   public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
     @Override
     public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      if (localTupleTagMap.containsKey(tag)) {
-        executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
-      } else {
-        executorsBolt.processExecutorElem(tag, output);
-      }
+      executorsBolt.processExecutorElem(tag, output);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
index 6feb7f8..e6d09c4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
@@ -20,8 +20,10 @@ package org.apache.beam.runners.jstorm.translation;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -54,8 +56,7 @@ class ParDoBoundTranslator<InputT, OutputT>
     TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
     List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
 
-    Map<TupleTag<?>, PValue> allInputs =
-        avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
+    Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
     for (PCollectionView pCollectionView : transform.getSideInputs()) {
       allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index e25f211..101921f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -36,7 +36,6 @@ import java.util.Map;
 import org.apache.beam.runners.jstorm.JStormPipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
@@ -184,10 +183,6 @@ public class TranslationContext {
       TupleTag tag = entry.getKey();
       PValue value = entry.getValue();
 
-      // use tag of PValueBase
-      if (value instanceof PValueBase) {
-        tag = ((PValueBase) value).expand().keySet().iterator().next();
-      }
       executionGraphContext.registerStreamProducer(
           TaggedPValue.of(tag, value),
           Stream.Producer.of(name, tag.getId(), value.getName()));
@@ -198,6 +193,9 @@ public class TranslationContext {
     for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) {
       TupleTag tag = entry.getKey();
       PValue value = entry.getValue();
+      if (userGraphContext.findTupleTag(value) != null) {
+        tag = userGraphContext.findTupleTag(value);
+      }
       bolt.addExecutor(tag, executor);
 
       // filter all connections inside bolt
@@ -269,11 +267,15 @@ public class TranslationContext {
     }
 
     public TupleTag<?> getInputTag() {
-      return currentTransform.getInputs().keySet().iterator().next();
+      return pValueToTupleTag.get(this.getInput());
     }
 
     public List<TupleTag<?>> getInputTags() {
-      return Lists.newArrayList(currentTransform.getInputs().keySet());
+      List inputTags = Lists.newArrayList();
+      for (PValue value : currentTransform.getInputs().values()) {
+        inputTags.add(pValueToTupleTag.get(value));
+      }
+      return inputTags;
     }
 
     public <T extends PValue> T getOutput() {