You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/28 16:20:20 UTC
[1/2] beam git commit: This closes #2156
Repository: beam
Updated Branches:
refs/heads/master 1339dd706 -> 118da8171
This closes #2156
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/118da817
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/118da817
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/118da817
Branch: refs/heads/master
Commit: 118da81716ef3f8fc4dc035bd998eff821bb9b2d
Parents: 1339dd7 329f5f2
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 28 09:20:04 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 28 09:20:04 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/BatchViewOverrides.java | 52 ++++++++++++
.../beam/runners/dataflow/DataflowRunner.java | 9 +-
.../DataflowPipelineTranslatorTest.java | 16 +++-
.../org/apache/beam/sdk/transforms/Combine.java | 23 ++---
.../org/apache/beam/sdk/transforms/View.java | 71 ++++++++++++++--
.../beam/sdk/runners/TransformTreeTest.java | 88 ++++++++++----------
6 files changed, 196 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Use CreatePCollectionView explicitly in
CombineGloballyAsSingletonView
Posted by tg...@apache.org.
Use CreatePCollectionView explicitly in CombineGloballyAsSingletonView
Implement View.asSingleton as a CombineGloballyAsSingletonView.
This stops any writing of a SingletonView that is not actually a
Singleton View.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/329f5f2d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/329f5f2d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/329f5f2d
Branch: refs/heads/master
Commit: 329f5f2d0e5616cb20e8e47d68c28fa4f691a6bd
Parents: 1339dd7
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 13:29:34 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 28 09:20:04 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/BatchViewOverrides.java | 52 ++++++++++++
.../beam/runners/dataflow/DataflowRunner.java | 9 +-
.../DataflowPipelineTranslatorTest.java | 16 +++-
.../org/apache/beam/sdk/transforms/Combine.java | 23 ++---
.../org/apache/beam/sdk/transforms/View.java | 71 ++++++++++++++--
.../beam/sdk/runners/TransformTreeTest.java | 88 ++++++++++----------
6 files changed, 196 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 81049bd..3689d3d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -42,6 +42,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
@@ -58,12 +59,16 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.AsSingleton;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -1388,4 +1393,51 @@ class BatchViewOverrides {
}
}
+ static class BatchCombineGloballyAsSingletonViewFactory<ElemT, ViewT>
+ extends SingleInputOutputOverrideFactory<
+ PCollection<ElemT>, PCollectionView<ViewT>,
+ Combine.GloballyAsSingletonView<ElemT, ViewT>> {
+ private final DataflowRunner runner;
+
+ BatchCombineGloballyAsSingletonViewFactory(DataflowRunner runner) {
+ this.runner = runner;
+ }
+
+ @Override
+ public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
+ final GloballyAsSingletonView<ElemT, ViewT> transform) {
+ return new BatchCombineGloballyAsSingletonView<>(
+ runner, transform.getCombineFn(), transform.getFanout(), transform.getInsertDefault());
+ }
+
+ private static class BatchCombineGloballyAsSingletonView<ElemT, ViewT>
+ extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+ private final DataflowRunner runner;
+ private final GlobalCombineFn<? super ElemT, ?, ViewT> combineFn;
+ private final int fanout;
+ private final boolean insertDefault;
+
+ BatchCombineGloballyAsSingletonView(
+ DataflowRunner runner,
+ GlobalCombineFn<? super ElemT, ?, ViewT> combineFn,
+ int fanout,
+ boolean insertDefault) {
+ this.runner = runner;
+ this.combineFn = combineFn;
+ this.fanout = fanout;
+ this.insertDefault = insertDefault;
+ }
+
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+ PCollection<ViewT> combined =
+ input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout));
+ AsSingleton<ViewT> viewAsSingleton = View.asSingleton();
+ if (insertDefault) {
+ viewAsSingleton.withDefaultValue(combineFn.defaultValue());
+ }
+ return combined.apply(new BatchViewAsSingleton<>(runner, viewAsSingleton));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index c612a20..a3249c3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -339,9 +339,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
ptoverrides
// State and timer pardos are implemented by expansion to GBK-then-ParDo
- .put(PTransformMatchers.stateOrTimerParDoMulti(),
+ .put(
+ PTransformMatchers.stateOrTimerParDoMulti(),
BatchStatefulParDoOverrides.multiOutputOverrideFactory())
- .put(PTransformMatchers.stateOrTimerParDoSingle(),
+ .put(
+ PTransformMatchers.stateOrTimerParDoSingle(),
BatchStatefulParDoOverrides.singleOutputOverrideFactory())
// Write uses views internally
@@ -354,6 +356,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
new ReflectiveOneToOneOverrideFactory(
BatchViewOverrides.BatchViewAsMultimap.class, this))
.put(
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory(this))
+ .put(
PTransformMatchers.classEqualTo(View.AsSingleton.class),
new ReflectiveOneToOneOverrideFactory(
BatchViewOverrides.BatchViewAsSingleton.class, this))
http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 7e2eb5f..8c8568e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -792,12 +792,24 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(2, steps.size());
+ assertEquals(6, steps.size());
Step createStep = steps.get(0);
assertEquals("ParallelRead", createStep.getKind());
- Step collectionToSingletonStep = steps.get(1);
+ Step addNullKeyStep = steps.get(1);
+ assertEquals("ParallelDo", addNullKeyStep.getKind());
+
+ Step groupByKeyStep = steps.get(2);
+ assertEquals("GroupByKey", groupByKeyStep.getKind());
+
+ Step combineGroupedValuesStep = steps.get(3);
+ assertEquals("ParallelDo", combineGroupedValuesStep.getKind());
+
+ Step dropKeysStep = steps.get(4);
+ assertEquals("ParallelDo", dropKeysStep.getKind());
+
+ Step collectionToSingletonStep = steps.get(5);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 3215ffa..7295f63 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -60,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.util.NameUtils.NameOverride;
+import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -1577,17 +1579,16 @@ public class Combine {
@Override
public PCollectionView<OutputT> expand(PCollection<InputT> input) {
- Globally<InputT, OutputT> combineGlobally =
- Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout);
- if (insertDefault) {
- return input
- .apply(combineGlobally)
- .apply(View.<OutputT>asSingleton().withDefaultValue(fn.defaultValue()));
- } else {
- return input
- .apply(combineGlobally)
- .apply(View.<OutputT>asSingleton());
- }
+ PCollection<OutputT> combined =
+ input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
+ return combined.apply(
+ CreatePCollectionView.<OutputT, OutputT>of(
+ PCollectionViews.singletonView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ insertDefault,
+ insertDefault ? fn.defaultValue() : null,
+ combined.getCoder())));
}
public int getFanout() {
http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 1986ac5..767847d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -19,8 +19,11 @@ package org.apache.beam.sdk.transforms;
import java.util.List;
import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -336,12 +339,68 @@ public class View {
@Override
public PCollectionView<T> expand(PCollection<T> input) {
- return input.apply(CreatePCollectionView.<T, T>of(PCollectionViews.singletonView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- hasDefault,
- defaultValue,
- input.getCoder())));
+ Combine.Globally<T, T> singletonCombine =
+ Combine.globally(new SingletonCombineFn<>(hasDefault, input.getCoder(), defaultValue));
+ if (!hasDefault) {
+ singletonCombine = singletonCombine.withoutDefaults();
+ }
+ return input.apply(singletonCombine.asSingletonView());
+ }
+ }
+
+ private static class SingletonCombineFn<T> extends Combine.BinaryCombineFn<T> {
+ private final boolean hasDefault;
+ private final Coder<T> valueCoder;
+ private final byte[] defaultValue;
+
+ private SingletonCombineFn(boolean hasDefault, Coder<T> coder, T defaultValue) {
+ this.hasDefault = hasDefault;
+ if (hasDefault) {
+ if (defaultValue == null) {
+ this.defaultValue = null;
+ this.valueCoder = coder;
+ } else {
+ this.valueCoder = coder;
+ try {
+ this.defaultValue = CoderUtils.encodeToByteArray(coder, defaultValue);
+ } catch (CoderException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not encode the default value %s with the provided coder %s",
+ defaultValue, coder));
+ }
+ }
+ } else {
+ this.valueCoder = null;
+ this.defaultValue = null;
+ }
+ }
+
+ @Override
+ public T apply(T left, T right) {
+ throw new IllegalArgumentException(
+ "PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
+ }
+
+ public T identity() {
+ if (hasDefault) {
+ if (defaultValue == null) {
+ return null;
+ }
+ try {
+ return CoderUtils.decodeFromByteArray(valueCoder, defaultValue);
+ } catch (CoderException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not decode the default value with the provided coder %s", valueCoder));
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 53bc114..2d55005 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -64,7 +64,7 @@ public class TransformTreeTest {
enum TransformsSeen {
READ,
WRITE,
- COMBINE_GLOBALLY
+ SAMPLE
}
/**
@@ -112,8 +112,6 @@ public class TransformTreeTest {
}
}
- // Builds a pipeline containing a composite operation (Pick), then
- // visits the nodes and verifies that the hierarchy was captured.
@Test
public void testCompositeCapture() throws Exception {
p.enableAbandonedNodeEnforcement(false);
@@ -121,8 +119,10 @@ public class TransformTreeTest {
File inputFile = tmpFolder.newFile();
File outputFile = tmpFolder.newFile();
+ final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample =
+ Sample.fixedSizeGlobally(10);
p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
- .apply(Combine.globally(Sample.<String>combineFn(10)))
+ .apply(sample)
.apply(Flatten.<String>iterables())
.apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
@@ -131,46 +131,50 @@ public class TransformTreeTest {
final EnumSet<TransformsSeen> left =
EnumSet.noneOf(TransformsSeen.class);
- p.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- PTransform<?, ?> transform = node.getTransform();
- if (transform instanceof Combine.Globally) {
- assertTrue(visited.add(TransformsSeen.COMBINE_GLOBALLY));
- assertNotNull(node.getEnclosingNode());
- assertTrue(node.isCompositeNode());
- } else if (transform instanceof Write) {
- assertTrue(visited.add(TransformsSeen.WRITE));
- assertNotNull(node.getEnclosingNode());
- assertTrue(node.isCompositeNode());
- }
- assertThat(transform, not(instanceOf(Read.Bounded.class)));
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void leaveCompositeTransform(TransformHierarchy.Node node) {
- PTransform<?, ?> transform = node.getTransform();
- if (transform instanceof Combine.Globally) {
- assertTrue(left.add(TransformsSeen.COMBINE_GLOBALLY));
- }
- }
-
- @Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- PTransform<?, ?> transform = node.getTransform();
- // Pick is a composite, should not be visited here.
- assertThat(transform, not(instanceOf(Combine.Globally.class)));
- assertThat(transform, not(instanceOf(Write.class)));
- if (transform instanceof Read.Bounded
- && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
- assertTrue(visited.add(TransformsSeen.READ));
- }
- }
- });
+ p.traverseTopologically(
+ new Pipeline.PipelineVisitor.Defaults() {
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ if (node.isRootNode()) {
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+ PTransform<?, ?> transform = node.getTransform();
+ if (sample.getClass().equals(transform.getClass())) {
+ assertTrue(visited.add(TransformsSeen.SAMPLE));
+ assertNotNull(node.getEnclosingNode());
+ assertTrue(node.isCompositeNode());
+ } else if (transform instanceof Write) {
+ assertTrue(visited.add(TransformsSeen.WRITE));
+ assertNotNull(node.getEnclosingNode());
+ assertTrue(node.isCompositeNode());
+ }
+ assertThat(transform, not(instanceOf(Read.Bounded.class)));
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {
+ PTransform<?, ?> transform = node.getTransform();
+ if (!node.isRootNode() && transform.getClass().equals(sample.getClass())) {
+ assertTrue(left.add(TransformsSeen.SAMPLE));
+ }
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ PTransform<?, ?> transform = node.getTransform();
+ // Pick is a composite, should not be visited here.
+ assertThat(transform, not(instanceOf(Combine.Globally.class)));
+ assertThat(transform, not(instanceOf(Write.class)));
+ if (transform instanceof Read.Bounded
+ && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
+ assertTrue(visited.add(TransformsSeen.READ));
+ }
+ }
+ });
assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class)));
- assertTrue(left.equals(EnumSet.of(TransformsSeen.COMBINE_GLOBALLY)));
+ assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE)));
}
@Test(expected = IllegalArgumentException.class)