You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 03:23:20 UTC
[08/12] beam git commit: fix ParDo.BoundMulti translation
fix ParDo.BoundMulti translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2d326ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2d326ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2d326ff
Branch: refs/heads/gearpump-runner
Commit: b2d326ff73afca5c8e941c8006e9d74261a6b9df
Parents: 364a3f0
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 16 12:31:26 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jan 16 12:31:26 2017 +0800
----------------------------------------------------------------------
.../gearpump/translators/ParDoBoundMultiTranslator.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b2d326ff/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 0d5b8bc..bf7073b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -91,8 +91,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
private DoFnRunner<InputT, OutputT> doFnRunner;
private final DoFn<InputT, OutputT> doFn;
- private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
- .newArrayList();
+ private List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs;
public DoFnMultiFunction(
GearpumpPipelineOptions pipelineOptions,
@@ -127,6 +126,8 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
@Override
public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
+ outputs = Lists.newArrayList();
+
if (null == doFnRunner) {
doFnRunner = doFnRunnerFactory.createRunner();
}
@@ -166,6 +167,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
@Override
public WindowedValue<OutputT> apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
+ // System.out.println(wv.getValue().getKey() + ":" + wv.getValue().getValue());
return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(),
wv.getWindows(), wv.getPane());
}