You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/11 03:39:22 UTC
[3/8] beam git commit: Include PCollection in rehydrated
PCollectionView
Include PCollection in rehydrated PCollectionView
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de39f324
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de39f324
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de39f324
Branch: refs/heads/master
Commit: de39f324c4b0914418894a41c6f75596310bf633
Parents: 165dfa6
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 6 09:24:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 10 20:14:27 2017 -0700
----------------------------------------------------------------------
.../core/construction/ParDoTranslation.java | 51 +++++++++++++++++---
.../construction/RunnerPCollectionView.java | 7 +--
.../core/construction/ParDoTranslationTest.java | 28 +++++++----
3 files changed, 67 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/de39f324/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 5f2bcae..fe8c5aa 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -74,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
@@ -262,8 +264,12 @@ public class ParDoTranslation {
List<PCollectionView<?>> views = new ArrayList<>();
for (Map.Entry<String, SideInput> sideInput : payload.getSideInputsMap().entrySet()) {
views.add(
- fromProto(
- sideInput.getValue(), sideInput.getKey(), parDoProto, sdkComponents.toComponents()));
+ viewFromProto(
+ application.getPipeline(),
+ sideInput.getValue(),
+ sideInput.getKey(),
+ parDoProto,
+ sdkComponents.toComponents()));
}
return views;
}
@@ -495,15 +501,47 @@ public class ParDoTranslation {
return builder.build();
}
- public static PCollectionView<?> fromProto(
- SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components)
+ public static PCollectionView<?> viewFromProto(
+ Pipeline pipeline,
+ SideInput sideInput,
+ String localName,
+ RunnerApi.PTransform parDoTransform,
+ Components components)
throws IOException {
- TupleTag<?> tag = new TupleTag<>(id);
+
+ String pCollectionId = parDoTransform.getInputsOrThrow(localName);
+
+ // This may be a PCollection defined in another language, but we should be
+ // able to rehydrate it enough to stick it in a side input. The coder may not
+ // be grokkable in Java.
+ PCollection<?> pCollection =
+ PCollectionTranslation.fromProto(
+ pipeline, components.getPcollectionsOrThrow(pCollectionId), components);
+
+ return viewFromProto(sideInput, localName, pCollection, parDoTransform, components);
+ }
+
+ /**
+ * Create a {@link PCollectionView} from a side input spec and an already-deserialized {@link
+ * PCollection} that should be wired up.
+ */
+ public static PCollectionView<?> viewFromProto(
+ SideInput sideInput,
+ String localName,
+ PCollection<?> pCollection,
+ RunnerApi.PTransform parDoTransform,
+ Components components)
+ throws IOException {
+ checkArgument(
+ localName != null,
+ "%s.viewFromProto: localName must not be null",
+ ParDoTranslation.class.getSimpleName());
+ TupleTag<?> tag = new TupleTag<>(localName);
WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
RunnerApi.PCollection inputCollection =
- components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
+ components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(localName));
WindowingStrategy<?, ?> windowingStrategy =
WindowingStrategyTranslation.fromProto(
components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
@@ -523,6 +561,7 @@ public class ParDoTranslation {
PCollectionView<?> view =
new RunnerPCollectionView<>(
+ pCollection,
(TupleTag<Iterable<WindowedValue<?>>>) tag,
(ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
windowMappingFn,
http://git-wip-us.apache.org/repos/asf/beam/blob/de39f324/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
index c359cec..b275188 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -39,16 +39,19 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T>
private final WindowMappingFn<?> windowMappingFn;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Coder<Iterable<WindowedValue<?>>> coder;
+ private final transient PCollection<?> pCollection;
/**
* Create a new {@link RunnerPCollectionView} from the provided components.
*/
RunnerPCollectionView(
+ PCollection<?> pCollection,
TupleTag<Iterable<WindowedValue<?>>> tag,
ViewFn<Iterable<WindowedValue<?>>, T> viewFn,
WindowMappingFn<?> windowMappingFn,
@Nullable WindowingStrategy<?, ?> windowingStrategy,
@Nullable Coder<Iterable<WindowedValue<?>>> coder) {
+ this.pCollection = pCollection;
this.tag = tag;
this.viewFn = viewFn;
this.windowMappingFn = windowMappingFn;
@@ -56,11 +59,9 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T>
this.coder = coder;
}
- @Nullable
@Override
public PCollection<?> getPCollection() {
- throw new IllegalStateException(
- String.format("Cannot call getPCollection on a %s", getClass().getSimpleName()));
+ return pCollection;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/de39f324/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index a8490bf..6fdf9d6 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -23,9 +23,9 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -143,22 +143,30 @@ public class ParDoTranslationTest {
inputs.putAll(parDo.getAdditionalInputs());
PCollectionTuple output = mainInput.apply(parDo);
- SdkComponents components = SdkComponents.create();
- String transformId =
- components.registerPTransform(
+ SdkComponents sdkComponents = SdkComponents.create();
+
+ // Encode
+ RunnerApi.PTransform protoTransform =
+ PTransformTranslation.toProto(
AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of(
"foo", inputs, output.expand(), parDo, p),
- Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+ sdkComponents);
+ Components protoComponents = sdkComponents.toComponents();
+
+ // Decode
+ Pipeline rehydratedPipeline = Pipeline.create();
- Components protoComponents = components.toComponents();
- RunnerApi.PTransform protoTransform = protoComponents.getTransformsOrThrow(transformId);
ParDoPayload parDoPayload =
protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
for (PCollectionView<?> view : parDo.getSideInputs()) {
SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
PCollectionView<?> restoredView =
- ParDoTranslation.fromProto(
- sideInput, view.getTagInternal().getId(), protoTransform, protoComponents);
+ ParDoTranslation.viewFromProto(
+ rehydratedPipeline,
+ sideInput,
+ view.getTagInternal().getId(),
+ protoTransform,
+ protoComponents);
assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
assertThat(
@@ -169,7 +177,7 @@ public class ParDoTranslationTest {
view.getWindowingStrategyInternal().fixDefaults()));
assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
}
- String mainInputId = components.registerPCollection(mainInput);
+ String mainInputId = sdkComponents.registerPCollection(mainInput);
assertThat(
ParDoTranslation.getMainInput(protoTransform, protoComponents),
equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));