You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/26 16:23:53 UTC

[2/4] beam git commit: Update Apex Overrides

Update Apex Overrides

Only override CreatePCollectionView transforms


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8eb09aad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8eb09aad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8eb09aad

Branch: refs/heads/master
Commit: 8eb09aad9c975f787ba8afac83394cc8b56eb94f
Parents: bd1dfdf
Author: Thomas Groh <tg...@google.com>
Authored: Thu May 25 10:41:56 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 26 07:50:37 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    | 119 ++++---------------
 1 file changed, 21 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8eb09aad/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index f91d8e5..c595b3f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -57,7 +57,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -65,7 +64,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.AsIterable;
-import org.apache.beam.sdk.transforms.View.AsSingleton;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -111,16 +110,12 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
                 new PrimitiveCreate.Factory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                new StreamingViewAsSingleton.Factory()))
-        .add(
-            PTransformOverride.of(
-                PTransformMatchers.classEqualTo(View.AsIterable.class),
+                PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
                 new StreamingViewAsIterable.Factory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-                new StreamingCombineGloballyAsSingletonView.Factory()))
+                PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
+                new StreamingWrapSingletonInList.Factory()))
         .add(
             PTransformOverride.of(
                 PTransformMatchers.splittableParDoMulti(),
@@ -245,117 +240,45 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
   }
 
-  private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+  private static class StreamingWrapSingletonInList<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
     private static final long serialVersionUID = 1L;
-    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+    CreatePCollectionView<T, T> transform;
 
     /**
      * Builds an instance of this class from the overridden transform.
      */
-    private StreamingCombineGloballyAsSingletonView(
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+    private StreamingWrapSingletonInList(
+        CreatePCollectionView<T, T> transform) {
       this.transform = transform;
     }
 
     @Override
-    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      PCollection<OutputT> combined = input
-          .apply(Combine.globally(transform.getCombineFn())
-              .withoutDefaults().withFanout(transform.getFanout()));
-
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(combined,
-          combined.getWindowingStrategy(), transform.getInsertDefault(),
-          transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null,
-              combined.getCoder());
-      return combined.apply(ParDo.of(new WrapAsList<OutputT>()))
-          .apply(CreateApexPCollectionView.<OutputT, OutputT> of(view));
+    public PCollectionView<T> expand(PCollection<T> input) {
+      return input
+          .apply(ParDo.of(new WrapAsList<T>()))
+          .apply(CreateApexPCollectionView.<T, T>of(transform.getView()));
     }
 
     @Override
     protected String getKindString() {
-      return "StreamingCombineGloballyAsSingletonView";
+      return "StreamingWrapSingletonInList";
     }
 
-    static class Factory<InputT, OutputT>
+    static class Factory<T>
         extends SingleInputOutputOverrideFactory<
-            PCollection<InputT>, PCollectionView<OutputT>,
-            Combine.GloballyAsSingletonView<InputT, OutputT>> {
+            PCollection<T>, PCollectionView<T>,
+            CreatePCollectionView<T, T>> {
       @Override
-      public PTransformReplacement<PCollection<InputT>, PCollectionView<OutputT>>
+      public PTransformReplacement<PCollection<T>, PCollectionView<T>>
           getReplacementTransform(
               AppliedPTransform<
-                      PCollection<InputT>, PCollectionView<OutputT>,
-                      GloballyAsSingletonView<InputT, OutputT>>
+                      PCollection<T>, PCollectionView<T>,
+                      CreatePCollectionView<T, T>>
                   transform) {
         return PTransformReplacement.of(
             PTransformReplacements.getSingletonMainInput(transform),
-            new StreamingCombineGloballyAsSingletonView<>(transform.getTransform()));
-      }
-    }
-  }
-
-  private static class StreamingViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
-    private static final long serialVersionUID = 1L;
-
-    private View.AsSingleton<T> transform;
-
-    public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      Combine.Globally<T, T> combine = Combine
-          .globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
-      if (!transform.hasDefaultValue()) {
-        combine = combine.withoutDefaults();
-      }
-      return input.apply(combine.asSingletonView());
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsSingleton";
-    }
-
-    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-      private boolean hasDefaultValue;
-      private T defaultValue;
-
-      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-        this.hasDefaultValue = hasDefaultValue;
-        this.defaultValue = defaultValue;
-      }
-
-      @Override
-      public T apply(T left, T right) {
-        throw new IllegalArgumentException("PCollection with more than one element "
-            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-            + "combine the PCollection into a single value");
-      }
-
-      @Override
-      public T identity() {
-        if (hasDefaultValue) {
-          return defaultValue;
-        } else {
-          throw new IllegalArgumentException("Empty PCollection accessed as a singleton view. "
-              + "Consider setting withDefault to provide a default value");
-        }
-      }
-    }
-
-    static class Factory<T>
-        extends SingleInputOutputOverrideFactory<
-            PCollection<T>, PCollectionView<T>, View.AsSingleton<T>> {
-      @Override
-      public PTransformReplacement<PCollection<T>, PCollectionView<T>> getReplacementTransform(
-          AppliedPTransform<PCollection<T>, PCollectionView<T>, AsSingleton<T>> transform) {
-        return PTransformReplacement.of(
-            PTransformReplacements.getSingletonMainInput(transform),
-            new StreamingViewAsSingleton<>(transform.getTransform()));
+            new StreamingWrapSingletonInList<>(transform.getTransform()));
       }
     }
   }