You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:56:04 UTC
[49/50] [abbrv] beam git commit: Fix side input handling in
DoFnFunction
Fix side input handling in DoFnFunction
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7653e7ed
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7653e7ed
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7653e7ed
Branch: refs/heads/gearpump-runner
Commit: 7653e7ed6de3d9db822dcd390d2bf70819954fa5
Parents: 98854d4
Author: manuzhang <ow...@gmail.com>
Authored: Wed Jun 7 14:08:04 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jun 12 11:45:37 2017 +0800
----------------------------------------------------------------------
.../translators/TranslationContext.java | 2 ++
.../translators/functions/DoFnFunction.java | 23 ++++----------------
.../gearpump/translators/io/GearpumpSource.java | 1 -
.../translators/utils/TranslatorUtils.java | 5 ++---
4 files changed, 8 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index 4090354..64a1e0d 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -68,6 +68,8 @@ public class TranslationContext {
public <OutputT> void setOutputStream(PValue output, JavaStream<OutputT> outputStream) {
if (!streams.containsKey(output)) {
streams.put(output, outputStream);
+ } else {
+ throw new RuntimeException("set stream for duplicated output " + output);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index f521d7b..6e4fbeb 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -66,7 +65,6 @@ public class DoFnFunction<InputT, OutputT> extends
private transient PushbackSideInputDoFnRunner<InputT, OutputT> doFnRunner;
private transient SideInputHandler sideInputReader;
private transient List<WindowedValue<InputT>> pushedBackValues;
- private transient Map<PCollectionView<?>, List<WindowedValue<Iterable<?>>>> sideInputValues;
private final Collection<PCollectionView<?>> sideInputs;
private final Map<String, PCollectionView<?>> tagsToSideInputs;
private final TupleTag<OutputT> mainOutput;
@@ -109,7 +107,6 @@ public class DoFnFunction<InputT, OutputT> extends
doFnRunner = doFnRunnerFactory.createRunner(sideInputReader);
pushedBackValues = new LinkedList<>();
- sideInputValues = new HashMap<>();
outputManager.setup(mainOutput, sideOutputs);
}
@@ -132,25 +129,14 @@ public class DoFnFunction<InputT, OutputT> extends
} else {
// side input
PCollectionView<?> sideInput = tagsToSideInputs.get(unionValue.getUnionTag());
- WindowedValue<?> sideInputValue =
- (WindowedValue<?>) unionValue.getValue();
- Object value = sideInputValue.getValue();
- if (!(value instanceof Iterable)) {
- sideInputValue = sideInputValue.withValue(Lists.newArrayList(value));
- }
- if (!sideInputValues.containsKey(sideInput)) {
- sideInputValues.put(sideInput, new LinkedList<WindowedValue<Iterable<?>>>());
- }
- sideInputValues.get(sideInput).add((WindowedValue<Iterable<?>>) sideInputValue);
+ WindowedValue<Iterable<?>> sideInputValue =
+ (WindowedValue<Iterable<?>>) unionValue.getValue();
+ sideInputReader.addSideInputValue(sideInput, sideInputValue);
}
}
+
for (PCollectionView<?> sideInput: sideInputs) {
- if (sideInputValues.containsKey(sideInput)) {
- for (WindowedValue<Iterable<?>> value: sideInputValues.get(sideInput)) {
- sideInputReader.addSideInputValue(sideInput, value);
- }
- }
for (WindowedValue<InputT> value : pushedBackValues) {
for (BoundedWindow win: value.getWindows()) {
BoundedWindow sideInputWindow =
@@ -171,7 +157,6 @@ public class DoFnFunction<InputT, OutputT> extends
}
pushedBackValues.clear();
Iterables.addAll(pushedBackValues, nextPushedBackValues);
- sideInputValues.clear();
doFnRunner.finishBundle();
http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 5e79151..60f319d 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -28,7 +28,6 @@ import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
-// import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index 999afae..282f261 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -70,11 +70,10 @@ public class TranslatorUtils {
JavaStream<WindowedValue<InputT>> inputStream,
Map<String, PCollectionView<?>> tagsToSideInputs) {
JavaStream<RawUnionValue> mainStream =
- inputStream.map(new ToRawUnionValue<InputT>("0"), "map_to_RawUnionValue");
+ inputStream.map(new ToRawUnionValue<>("0"), "map_to_RawUnionValue");
for (Map.Entry<String, PCollectionView<?>> tagToSideInput: tagsToSideInputs.entrySet()) {
- // actually JavaStream<WindowedValue<List<?>>>
- JavaStream<WindowedValue<Object>> sideInputStream = context.getInputStream(
+ JavaStream<WindowedValue<List<?>>> sideInputStream = context.getInputStream(
tagToSideInput.getValue());
mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>(
tagToSideInput.getKey()), "map_to_RawUnionValue"), "merge_to_MainStream");