You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/28 16:20:21 UTC

[2/2] beam git commit: Use CreatePCollectionView explicitly in CombineGloballyAsSingletonView

Use CreatePCollectionView explicitly in CombineGloballyAsSingletonView

Implement View.asSingleton as a CombineGloballyAsSingletonView.

This stops any writing of a SingletonView that is not actually a
Singleton View.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/329f5f2d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/329f5f2d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/329f5f2d

Branch: refs/heads/master
Commit: 329f5f2d0e5616cb20e8e47d68c28fa4f691a6bd
Parents: 1339dd7
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 13:29:34 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 28 09:20:04 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/BatchViewOverrides.java    | 52 ++++++++++++
 .../beam/runners/dataflow/DataflowRunner.java   |  9 +-
 .../DataflowPipelineTranslatorTest.java         | 16 +++-
 .../org/apache/beam/sdk/transforms/Combine.java | 23 ++---
 .../org/apache/beam/sdk/transforms/View.java    | 71 ++++++++++++++--
 .../beam/sdk/runners/TransformTreeTest.java     | 88 ++++++++++----------
 6 files changed, 196 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 81049bd..3689d3d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -42,6 +42,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.dataflow.internal.IsmFormat;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
@@ -58,12 +59,16 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.AsSingleton;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -1388,4 +1393,51 @@ class BatchViewOverrides {
     }
   }
 
+  static class BatchCombineGloballyAsSingletonViewFactory<ElemT, ViewT>
+      extends SingleInputOutputOverrideFactory<
+          PCollection<ElemT>, PCollectionView<ViewT>,
+          Combine.GloballyAsSingletonView<ElemT, ViewT>> {
+    private final DataflowRunner runner;
+
+    BatchCombineGloballyAsSingletonViewFactory(DataflowRunner runner) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
+        final GloballyAsSingletonView<ElemT, ViewT> transform) {
+      return new BatchCombineGloballyAsSingletonView<>(
+          runner, transform.getCombineFn(), transform.getFanout(), transform.getInsertDefault());
+    }
+
+    private static class BatchCombineGloballyAsSingletonView<ElemT, ViewT>
+        extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+      private final DataflowRunner runner;
+      private final GlobalCombineFn<? super ElemT, ?, ViewT> combineFn;
+      private final int fanout;
+      private final boolean insertDefault;
+
+      BatchCombineGloballyAsSingletonView(
+          DataflowRunner runner,
+          GlobalCombineFn<? super ElemT, ?, ViewT> combineFn,
+          int fanout,
+          boolean insertDefault) {
+        this.runner = runner;
+        this.combineFn = combineFn;
+        this.fanout = fanout;
+        this.insertDefault = insertDefault;
+      }
+
+      @Override
+      public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+        PCollection<ViewT> combined =
+            input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout));
+        AsSingleton<ViewT> viewAsSingleton = View.asSingleton();
+        if (insertDefault) {
+          viewAsSingleton.withDefaultValue(combineFn.defaultValue());
+        }
+        return combined.apply(new BatchViewAsSingleton<>(runner, viewAsSingleton));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index c612a20..a3249c3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -339,9 +339,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       }
       ptoverrides
           // State and timer pardos are implemented by expansion to GBK-then-ParDo
-          .put(PTransformMatchers.stateOrTimerParDoMulti(),
+          .put(
+              PTransformMatchers.stateOrTimerParDoMulti(),
               BatchStatefulParDoOverrides.multiOutputOverrideFactory())
-          .put(PTransformMatchers.stateOrTimerParDoSingle(),
+          .put(
+              PTransformMatchers.stateOrTimerParDoSingle(),
               BatchStatefulParDoOverrides.singleOutputOverrideFactory())
 
           // Write uses views internally
@@ -354,6 +356,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               new ReflectiveOneToOneOverrideFactory(
                   BatchViewOverrides.BatchViewAsMultimap.class, this))
           .put(
+              PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+              new BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory(this))
+          .put(
               PTransformMatchers.classEqualTo(View.AsSingleton.class),
               new ReflectiveOneToOneOverrideFactory(
                   BatchViewOverrides.BatchViewAsSingleton.class, this))

http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 7e2eb5f..8c8568e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -792,12 +792,24 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(2, steps.size());
+    assertEquals(6, steps.size());
 
     Step createStep = steps.get(0);
     assertEquals("ParallelRead", createStep.getKind());
 
-    Step collectionToSingletonStep = steps.get(1);
+    Step addNullKeyStep = steps.get(1);
+    assertEquals("ParallelDo", addNullKeyStep.getKind());
+
+    Step groupByKeyStep = steps.get(2);
+    assertEquals("GroupByKey", groupByKeyStep.getKind());
+
+    Step combineGroupedValuesStep = steps.get(3);
+    assertEquals("ParallelDo", combineGroupedValuesStep.getKind());
+
+    Step dropKeysStep = steps.get(4);
+    assertEquals("ParallelDo", dropKeysStep.getKind());
+
+    Step collectionToSingletonStep = steps.get(5);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 3215ffa..7295f63 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -60,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.NameUtils.NameOverride;
+import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -1577,17 +1579,16 @@ public class Combine {
 
     @Override
     public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      Globally<InputT, OutputT> combineGlobally =
-          Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout);
-      if (insertDefault) {
-        return input
-            .apply(combineGlobally)
-            .apply(View.<OutputT>asSingleton().withDefaultValue(fn.defaultValue()));
-      } else {
-        return input
-            .apply(combineGlobally)
-            .apply(View.<OutputT>asSingleton());
-      }
+      PCollection<OutputT> combined =
+          input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
+      return combined.apply(
+          CreatePCollectionView.<OutputT, OutputT>of(
+              PCollectionViews.singletonView(
+                  input.getPipeline(),
+                  input.getWindowingStrategy(),
+                  insertDefault,
+                  insertDefault ? fn.defaultValue() : null,
+                  combined.getCoder())));
     }
 
     public int getFanout() {

http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 1986ac5..767847d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -19,8 +19,11 @@ package org.apache.beam.sdk.transforms;
 
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -336,12 +339,68 @@ public class View {
 
     @Override
     public PCollectionView<T> expand(PCollection<T> input) {
-      return input.apply(CreatePCollectionView.<T, T>of(PCollectionViews.singletonView(
-          input.getPipeline(),
-          input.getWindowingStrategy(),
-          hasDefault,
-          defaultValue,
-          input.getCoder())));
+      Combine.Globally<T, T> singletonCombine =
+          Combine.globally(new SingletonCombineFn<>(hasDefault, input.getCoder(), defaultValue));
+      if (!hasDefault) {
+        singletonCombine = singletonCombine.withoutDefaults();
+      }
+      return input.apply(singletonCombine.asSingletonView());
+    }
+  }
+
+  private static class SingletonCombineFn<T> extends Combine.BinaryCombineFn<T> {
+    private final boolean hasDefault;
+    private final Coder<T> valueCoder;
+    private final byte[] defaultValue;
+
+    private SingletonCombineFn(boolean hasDefault, Coder<T> coder, T defaultValue) {
+      this.hasDefault = hasDefault;
+      if (hasDefault) {
+        if (defaultValue == null) {
+          this.defaultValue = null;
+          this.valueCoder = coder;
+        } else {
+          this.valueCoder = coder;
+          try {
+            this.defaultValue = CoderUtils.encodeToByteArray(coder, defaultValue);
+          } catch (CoderException e) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Could not encode the default value %s with the provided coder %s",
+                    defaultValue, coder));
+          }
+        }
+      } else {
+        this.valueCoder = null;
+        this.defaultValue = null;
+      }
+    }
+
+    @Override
+    public T apply(T left, T right) {
+      throw new IllegalArgumentException(
+          "PCollection with more than one element "
+              + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+              + "combine the PCollection into a single value");
+    }
+
+    public T identity() {
+      if (hasDefault) {
+        if (defaultValue == null) {
+          return null;
+        }
+        try {
+          return CoderUtils.decodeFromByteArray(valueCoder, defaultValue);
+        } catch (CoderException e) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Could not decode the default value with the provided coder %s", valueCoder));
+        }
+      } else {
+        throw new IllegalArgumentException(
+            "Empty PCollection accessed as a singleton view. "
+                + "Consider setting withDefault to provide a default value");
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/329f5f2d/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 53bc114..2d55005 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -64,7 +64,7 @@ public class TransformTreeTest {
   enum TransformsSeen {
     READ,
     WRITE,
-    COMBINE_GLOBALLY
+    SAMPLE
   }
 
   /**
@@ -112,8 +112,6 @@ public class TransformTreeTest {
     }
   }
 
-  // Builds a pipeline containing a composite operation (Pick), then
-  // visits the nodes and verifies that the hierarchy was captured.
   @Test
   public void testCompositeCapture() throws Exception {
     p.enableAbandonedNodeEnforcement(false);
@@ -121,8 +119,10 @@ public class TransformTreeTest {
     File inputFile = tmpFolder.newFile();
     File outputFile = tmpFolder.newFile();
 
+    final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample =
+        Sample.fixedSizeGlobally(10);
     p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
-        .apply(Combine.globally(Sample.<String>combineFn(10)))
+        .apply(sample)
         .apply(Flatten.<String>iterables())
         .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
 
@@ -131,46 +131,50 @@ public class TransformTreeTest {
     final EnumSet<TransformsSeen> left =
         EnumSet.noneOf(TransformsSeen.class);
 
-    p.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {
-      @Override
-      public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-        PTransform<?, ?> transform = node.getTransform();
-        if (transform instanceof Combine.Globally) {
-          assertTrue(visited.add(TransformsSeen.COMBINE_GLOBALLY));
-          assertNotNull(node.getEnclosingNode());
-          assertTrue(node.isCompositeNode());
-        } else if (transform instanceof Write) {
-          assertTrue(visited.add(TransformsSeen.WRITE));
-          assertNotNull(node.getEnclosingNode());
-          assertTrue(node.isCompositeNode());
-        }
-        assertThat(transform, not(instanceOf(Read.Bounded.class)));
-        return CompositeBehavior.ENTER_TRANSFORM;
-      }
-
-      @Override
-      public void leaveCompositeTransform(TransformHierarchy.Node node) {
-        PTransform<?, ?> transform = node.getTransform();
-        if (transform instanceof Combine.Globally) {
-          assertTrue(left.add(TransformsSeen.COMBINE_GLOBALLY));
-        }
-      }
-
-      @Override
-      public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-        PTransform<?, ?> transform = node.getTransform();
-        // Pick is a composite, should not be visited here.
-        assertThat(transform, not(instanceOf(Combine.Globally.class)));
-        assertThat(transform, not(instanceOf(Write.class)));
-        if (transform instanceof Read.Bounded
-            && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
-          assertTrue(visited.add(TransformsSeen.READ));
-        }
-      }
-    });
+    p.traverseTopologically(
+        new Pipeline.PipelineVisitor.Defaults() {
+          @Override
+          public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+            if (node.isRootNode()) {
+              return CompositeBehavior.ENTER_TRANSFORM;
+            }
+            PTransform<?, ?> transform = node.getTransform();
+            if (sample.getClass().equals(transform.getClass())) {
+              assertTrue(visited.add(TransformsSeen.SAMPLE));
+              assertNotNull(node.getEnclosingNode());
+              assertTrue(node.isCompositeNode());
+            } else if (transform instanceof Write) {
+              assertTrue(visited.add(TransformsSeen.WRITE));
+              assertNotNull(node.getEnclosingNode());
+              assertTrue(node.isCompositeNode());
+            }
+            assertThat(transform, not(instanceOf(Read.Bounded.class)));
+            return CompositeBehavior.ENTER_TRANSFORM;
+          }
+
+          @Override
+          public void leaveCompositeTransform(TransformHierarchy.Node node) {
+            PTransform<?, ?> transform = node.getTransform();
+            if (!node.isRootNode() && transform.getClass().equals(sample.getClass())) {
+              assertTrue(left.add(TransformsSeen.SAMPLE));
+            }
+          }
+
+          @Override
+          public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+            PTransform<?, ?> transform = node.getTransform();
+            // Pick is a composite, should not be visited here.
+            assertThat(transform, not(instanceOf(Combine.Globally.class)));
+            assertThat(transform, not(instanceOf(Write.class)));
+            if (transform instanceof Read.Bounded
+                && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
+              assertTrue(visited.add(TransformsSeen.READ));
+            }
+          }
+        });
 
     assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class)));
-    assertTrue(left.equals(EnumSet.of(TransformsSeen.COMBINE_GLOBALLY)));
+    assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE)));
   }
 
   @Test(expected = IllegalArgumentException.class)