You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:28 UTC
[28/53] [abbrv] beam git commit: jstorm-runner: 1. Use the TupleTag
of "PCollection expand" when getting input tags and output tags 2. Check the
exception record when asserting of unit test
jstorm-runner:
1. Use the TupleTag of "PCollection expand" when getting input tags and output tags
2. Check the exception record when asserting of unit test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30f3eda6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30f3eda6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30f3eda6
Branch: refs/heads/jstorm-runner
Commit: 30f3eda64c68cea092c42b7acc1dfd98eb8cbbd0
Parents: df154de
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Jul 17 15:55:01 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:58 2017 +0800
----------------------------------------------------------------------
.../beam/runners/jstorm/TestJStormRunner.java | 9 +++++++++
.../runners/jstorm/translation/DoFnExecutor.java | 6 +++++-
.../jstorm/translation/MultiOutputDoFnExecutor.java | 6 +-----
.../jstorm/translation/ParDoBoundTranslator.java | 5 +++--
.../jstorm/translation/TranslationContext.java | 16 +++++++++-------
5 files changed, 27 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index b1b0379..a117675 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -8,6 +8,7 @@ import com.alibaba.jstorm.metric.AsmWindow;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.task.error.TaskReportErrorAndDie;
import com.alibaba.jstorm.utils.JStormUtils;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
@@ -56,14 +57,21 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
if (numberOfAssertions == 0) {
// If assert number is zero, wait 5 sec
JStormUtils.sleepMs(5000);
+ Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
+ if (taskExceptionRec != null) {
+ throw new RuntimeException(taskExceptionRec.getCause());
+ }
return result;
} else {
for (int i = 0; i < 40; ++i) {
Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions);
+ Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
if (success.isPresent() && success.get()) {
return result;
} else if (success.isPresent() && !success.get()) {
throw new AssertionError("Failed assertion checks.");
+ } else if (taskExceptionRec != null) {
+ throw new RuntimeException(taskExceptionRec.getCause());
} else {
JStormUtils.sleepMs(500);
}
@@ -74,6 +82,7 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
} finally {
clearPAssertCount();
cancel(result);
+ TaskReportErrorAndDie.setExceptionRecord(null);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
index fdd9af6..6baa944 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -203,8 +203,12 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
tag, mainInputTag, sideInputs, elem.getValue()));
if (mainInputTag.equals(tag)) {
processMainInput(elem);
- } else {
+ } else if (sideInputTagToView.containsKey(tag)) {
processSideInput(tag, elem);
+ } else {
+ LOG.warn("Discard unexpected elem={} from tag={}", elem.getValue(), tag);
+ LOG.warn("Current mainInputTag={}, sideInputTags={}",
+ mainInputTag, sideInputTagToView.keySet());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
index 49b0f85..138a5dc 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
@@ -46,11 +46,7 @@ class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, Outp
public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- if (localTupleTagMap.containsKey(tag)) {
- executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
- } else {
- executorsBolt.processExecutorElem(tag, output);
- }
+ executorsBolt.processExecutorElem(tag, output);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
index 6feb7f8..e6d09c4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
@@ -20,8 +20,10 @@ package org.apache.beam.runners.jstorm.translation;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
@@ -54,8 +56,7 @@ class ParDoBoundTranslator<InputT, OutputT>
TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
- Map<TupleTag<?>, PValue> allInputs =
- avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
+ Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
for (PCollectionView pCollectionView : transform.getSideInputs()) {
allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index e25f211..101921f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -36,7 +36,6 @@ import java.util.Map;
import org.apache.beam.runners.jstorm.JStormPipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
@@ -184,10 +183,6 @@ public class TranslationContext {
TupleTag tag = entry.getKey();
PValue value = entry.getValue();
- // use tag of PValueBase
- if (value instanceof PValueBase) {
- tag = ((PValueBase) value).expand().keySet().iterator().next();
- }
executionGraphContext.registerStreamProducer(
TaggedPValue.of(tag, value),
Stream.Producer.of(name, tag.getId(), value.getName()));
@@ -198,6 +193,9 @@ public class TranslationContext {
for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) {
TupleTag tag = entry.getKey();
PValue value = entry.getValue();
+ if (userGraphContext.findTupleTag(value) != null) {
+ tag = userGraphContext.findTupleTag(value);
+ }
bolt.addExecutor(tag, executor);
// filter all connections inside bolt
@@ -269,11 +267,15 @@ public class TranslationContext {
}
public TupleTag<?> getInputTag() {
- return currentTransform.getInputs().keySet().iterator().next();
+ return pValueToTupleTag.get(this.getInput());
}
public List<TupleTag<?>> getInputTags() {
- return Lists.newArrayList(currentTransform.getInputs().keySet());
+ List inputTags = Lists.newArrayList();
+ for (PValue value : currentTransform.getInputs().values()) {
+ inputTags.add(pValueToTupleTag.get(value));
+ }
+ return inputTags;
}
public <T extends PValue> T getOutput() {