You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:24 UTC
[33/50] [abbrv] beam git commit: Fix BatchViewOverrides
ViewAsSingleton to apply the combine fn that was being replaced.
Fix BatchViewOverrides ViewAsSingleton to apply the combine fn that was being replaced.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/accb2087
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/accb2087
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/accb2087
Branch: refs/heads/tez-runner
Commit: accb2087f88c641be9db038bbb5be715aacffb8d
Parents: c2f815c
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 14 19:19:48 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800
----------------------------------------------------------------------
.../runners/dataflow/BatchViewOverrides.java | 16 +++-
.../runners/dataflow/CreateDataflowView.java | 6 +-
.../beam/runners/dataflow/DataflowRunner.java | 80 +++++++++++++++-----
.../DataflowPipelineTranslatorTest.java | 6 +-
4 files changed, 80 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 8ed41cb..2953a42 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -55,6 +55,8 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -893,15 +895,23 @@ class BatchViewOverrides {
private final DataflowRunner runner;
private final PCollectionView<T> view;
- /** Builds an instance of this class from the overridden transform. */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public BatchViewAsSingleton(DataflowRunner runner, CreatePCollectionView<T, T> transform) {
+ private final CombineFn<T, ?, T> combineFn;
+ private final int fanout;
+
+ public BatchViewAsSingleton(
+ DataflowRunner runner,
+ CreatePCollectionView<T, T> transform,
+ CombineFn<T, ?, T> combineFn,
+ int fanout) {
this.runner = runner;
this.view = transform.getView();
+ this.combineFn = combineFn;
+ this.fanout = fanout;
}
@Override
public PCollection<?> expand(PCollection<T> input) {
+ input = input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout));
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
input.getWindowingStrategy().getWindowFn().windowCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index 10888c2..f64f3fb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -25,11 +25,13 @@ import org.apache.beam.sdk.values.PCollectionView;
/** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
public class CreateDataflowView<ElemT, ViewT>
extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
- public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(PCollectionView<ViewT> view) {
+ public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(
+ PCollectionView<ViewT> view) {
return new CreateDataflowView<>(view, false);
}
- public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(PCollectionView<ViewT> view) {
+ public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(
+ PCollectionView<ViewT> view) {
return new CreateDataflowView<>(view, true);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 72e4f83..a650092 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -116,6 +116,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -406,9 +407,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
BatchViewOverrides.BatchViewAsMultimap.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsSingleton.class),
- new ReflectiveViewOverrideFactory(
- BatchViewOverrides.BatchViewAsSingleton.class, this)))
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new CombineGloballyAsSingletonViewOverrideFactory(this)))
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(View.AsList.class),
@@ -437,29 +437,58 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required that the
- * new replacement transform uses the supplied PCollectionView and does not create another instance.
+ * Replace the {@link Combine.GloballyAsSingletonView} transform with a specialization which
+ * re-applies the {@link CombineFn} and adds a specialization specific to the Dataflow runner.
+ */
+ private static class CombineGloballyAsSingletonViewOverrideFactory<InputT, ViewT>
+ extends ReflectiveViewOverrideFactory<InputT, ViewT> {
+
+ private CombineGloballyAsSingletonViewOverrideFactory(DataflowRunner runner) {
+ super((Class) BatchViewOverrides.BatchViewAsSingleton.class, runner);
+ }
+
+ @Override
+ public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+ AppliedPTransform<
+ PCollection<InputT>,
+ PValue,
+ PTransform<PCollection<InputT>, PValue>> transform) {
+ Combine.GloballyAsSingletonView<?, ?> combineTransform =
+ (Combine.GloballyAsSingletonView) transform.getTransform();
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ new BatchViewOverrides.BatchViewAsSingleton(
+ runner,
+ findCreatePCollectionView(transform),
+ (CombineFn) combineTransform.getCombineFn(),
+ combineTransform.getFanout()));
+ }
+ }
+
+ /**
+ * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required
+ * that the new replacement transform uses the supplied PCollectionView and does not create
+ * another instance.
*/
private static class ReflectiveViewOverrideFactory<InputT, ViewT>
implements PTransformOverrideFactory<PCollection<InputT>,
PValue, PTransform<PCollection<InputT>, PValue>> {
- private final Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement;
- private final DataflowRunner runner;
+ final Class<PTransform<PCollection<InputT>, PValue>> replacement;
+ final DataflowRunner runner;
private ReflectiveViewOverrideFactory(
- Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement,
+ Class<PTransform<PCollection<InputT>, PValue>> replacement,
DataflowRunner runner) {
this.replacement = replacement;
this.runner = runner;
}
- @Override
- public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
- final AppliedPTransform<PCollection<InputT>,
- PValue,
- PTransform<PCollection<InputT>, PValue>> transform) {
-
+ CreatePCollectionView<ViewT, ViewT> findCreatePCollectionView(
+ final AppliedPTransform<
+ PCollection<InputT>,
+ PValue,
+ PTransform<PCollection<InputT>, PValue>> transform) {
final AtomicReference<CreatePCollectionView> viewTransformRef = new AtomicReference<>();
transform.getPipeline().traverseTopologically(new PipelineVisitor.Defaults() {
// Stores whether we have entered the expected composite view transform.
@@ -495,18 +524,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
checkState(viewTransformRef.get() != null,
"Expected to find CreatePCollectionView contained within %s",
transform.getTransform());
- PTransform<PCollection<InputT>, PCollectionView<ViewT>> rep =
+ return viewTransformRef.get();
+ }
+
+ @Override
+ public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+ final AppliedPTransform<PCollection<InputT>,
+ PValue,
+ PTransform<PCollection<InputT>, PValue>> transform) {
+
+ PTransform<PCollection<InputT>, PValue> rep =
InstanceBuilder.ofType(replacement)
.withArg(DataflowRunner.class, runner)
- .withArg(CreatePCollectionView.class, viewTransformRef.get())
+ .withArg(CreatePCollectionView.class, findCreatePCollectionView(transform))
.build();
- return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
}
@Override
- public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
- // We do not replace any of the outputs because we expect that the new PTransform will re-use the original
- // PCollectionView that was returned.
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
+ // We do not replace any of the outputs because we expect that the new PTransform will
+ // re-use the original PCollectionView that was returned.
return ImmutableMap.of();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index e03abb9..81e7a97 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(5, steps.size());
+ assertEquals(9, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(4);
+ Step collectionToSingletonStep = steps.get(8);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}