You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/11 03:39:24 UTC
[5/8] beam git commit: Port DirectRunner ParDo overrides to
SDK-agnostic APIs
Port DirectRunner ParDo overrides to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ac4b7e6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ac4b7e6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ac4b7e6
Branch: refs/heads/master
Commit: 1ac4b7e6f7dbbd68c27c6634cd52767885a42760
Parents: fa61ed1
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 13:44:52 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 10 20:17:56 2017 -0700
----------------------------------------------------------------------
.../core/construction/ParDoTranslation.java | 16 ++++++---
.../construction/RunnerPCollectionView.java | 16 +++++++++
.../direct/ParDoMultiOverrideFactory.java | 35 +++++++++-----------
3 files changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1ac4b7e6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index fe8c5aa..90c9aad 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -19,6 +19,7 @@
package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
@@ -262,12 +263,19 @@ public class ParDoTranslation {
ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
List<PCollectionView<?>> views = new ArrayList<>();
- for (Map.Entry<String, SideInput> sideInput : payload.getSideInputsMap().entrySet()) {
+ for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
+ String sideInputTag = sideInputEntry.getKey();
+ RunnerApi.SideInput sideInput = sideInputEntry.getValue();
+ PCollection<?> originalPCollection =
+ checkNotNull(
+ (PCollection<?>) application.getInputs().get(new TupleTag<>(sideInputTag)),
+ "no input with tag %s",
+ sideInputTag);
views.add(
viewFromProto(
- application.getPipeline(),
- sideInput.getValue(),
- sideInput.getKey(),
+ sideInput,
+ sideInputTag,
+ originalPCollection,
parDoProto,
sdkComponents.toComponents()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1ac4b7e6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
index b275188..85139e8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -19,6 +19,7 @@
package org.apache.beam.runners.core.construction;
import java.util.Map;
+import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
@@ -94,4 +95,19 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T>
throw new UnsupportedOperationException(String.format(
"A %s cannot be expanded", RunnerPCollectionView.class.getSimpleName()));
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof PCollectionView)) {
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ PCollectionView<?> otherView = (PCollectionView<?>) other;
+ return tag.equals(otherView.getTagInternal());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tag);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1ac4b7e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 8881967..891d102 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
@@ -73,9 +72,14 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
PCollection<? extends InputT>, PCollectionTuple,
PTransform<PCollection<? extends InputT>, PCollectionTuple>>
application) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(application),
- getReplacementForApplication(application));
+
+ try {
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(application),
+ getReplacementForApplication(application));
+ } catch (IOException exc) {
+ throw new RuntimeException(exc);
+ }
}
@SuppressWarnings("unchecked")
@@ -83,31 +87,22 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
AppliedPTransform<
PCollection<? extends InputT>, PCollectionTuple,
PTransform<PCollection<? extends InputT>, PCollectionTuple>>
- application) {
+ application)
+ throws IOException {
- DoFn<InputT, OutputT> fn;
- try {
- fn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application);
- } catch (IOException exc) {
- throw new RuntimeException(exc);
- }
+ DoFn<InputT, OutputT> fn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application);
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+
if (signature.processElement().isSplittable()) {
return (PTransform) SplittableParDo.forAppliedParDo(application);
} else if (signature.stateDeclarations().size() > 0
|| signature.timerDeclarations().size() > 0) {
-
- MultiOutput<InputT, OutputT> transform =
- (MultiOutput<InputT, OutputT>) application.getTransform();
-
- // Based on the fact that the signature is stateful, DoFnSignatures ensures
- // that it is also keyed
return new GbkThenStatefulParDo(
fn,
- transform.getMainOutputTag(),
- transform.getAdditionalOutputTags(),
- transform.getSideInputs());
+ ParDoTranslation.getMainOutputTag(application),
+ ParDoTranslation.getAdditionalOutputTags(application),
+ ParDoTranslation.getSideInputs(application));
} else {
return application.getTransform();
}