You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/10 03:14:02 UTC

[3/5] beam git commit: Port ViewOverrideFactory to SDK-agnostic APIs

Port ViewOverrideFactory to SDK-agnostic APIs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c5b57ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c5b57ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c5b57ea

Branch: refs/heads/master
Commit: 8c5b57ea8445cd50a35c6dffb460dcf0f426e700
Parents: b4c7716
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 14:26:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700

----------------------------------------------------------------------
 .../CreatePCollectionViewTranslation.java       |  4 +-
 .../runners/direct/ViewOverrideFactory.java     | 48 ++++++++++++--------
 .../direct/ViewEvaluatorFactoryTest.java        |  3 +-
 .../runners/direct/ViewOverrideFactoryTest.java | 23 ++++++++--
 .../beam/sdk/values/PCollectionViews.java       | 10 ++++
 5 files changed, 62 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index aa24909..8fc99b9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -56,8 +56,8 @@ public class CreatePCollectionViewTranslation {
   @Deprecated
   public static <ElemT, ViewT> PCollectionView<ViewT> getView(
       AppliedPTransform<
-              PCollection<ElemT>, PCollectionView<ViewT>,
-              PTransform<PCollection<ElemT>, PCollectionView<ViewT>>>
+              PCollection<ElemT>, PCollection<ElemT>,
+              PTransform<PCollection<ElemT>, PCollection<ElemT>>>
           application)
       throws IOException {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 06a7388..5dcf016 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -18,8 +18,9 @@
 
 package org.apache.beam.runners.direct;
 
+import java.io.IOException;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.ForwardingPTransform;
+import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -43,16 +44,30 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 class ViewOverrideFactory<ElemT, ViewT>
     implements PTransformOverrideFactory<
-        PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
+    PCollection<ElemT>, PCollection<ElemT>,
+        PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
 
   @Override
   public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
       AppliedPTransform<
-              PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
+              PCollection<ElemT>, PCollection<ElemT>,
+              PTransform<PCollection<ElemT>, PCollection<ElemT>>>
           transform) {
-    return PTransformReplacement.of(
+
+    PCollectionView<ViewT> view;
+    try {
+      view = CreatePCollectionViewTranslation.getView(transform);
+    } catch (IOException exc) {
+      throw new RuntimeException(
+          String.format(
+              "Could not extract %s from transform %s",
+              PCollectionView.class.getSimpleName(), transform),
+          exc);
+    }
+
+      return PTransformReplacement.of(
         PTransformReplacements.getSingletonMainInput(transform),
-        new GroupAndWriteView<>(transform.getTransform()));
+        new GroupAndWriteView<ElemT, ViewT>(view));
   }
 
   @Override
@@ -63,11 +78,11 @@ class ViewOverrideFactory<ElemT, ViewT>
 
   /** The {@link DirectRunner} composite override for {@link CreatePCollectionView}. */
   static class GroupAndWriteView<ElemT, ViewT>
-      extends ForwardingPTransform<PCollection<ElemT>, PCollection<ElemT>> {
-    private final CreatePCollectionView<ElemT, ViewT> og;
+      extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+    private final PCollectionView<ViewT> view;
 
-    private GroupAndWriteView(CreatePCollectionView<ElemT, ViewT> og) {
-      this.og = og;
+    private GroupAndWriteView(PCollectionView<ViewT> view) {
+      this.view = view;
     }
 
     @Override
@@ -77,14 +92,9 @@ class ViewOverrideFactory<ElemT, ViewT>
           .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
           .apply(GroupByKey.<Void, ElemT>create())
           .apply(Values.<Iterable<ElemT>>create())
-          .apply(new WriteView<ElemT, ViewT>(og));
+          .apply(new WriteView<ElemT, ViewT>(view));
       return input;
     }
-
-    @Override
-    protected PTransform<PCollection<ElemT>, PCollection<ElemT>> delegate() {
-      return og;
-    }
   }
 
   /**
@@ -96,10 +106,10 @@ class ViewOverrideFactory<ElemT, ViewT>
    */
   static final class WriteView<ElemT, ViewT>
       extends RawPTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> {
-    private final CreatePCollectionView<ElemT, ViewT> og;
+    private final PCollectionView<ViewT> view;
 
-    WriteView(CreatePCollectionView<ElemT, ViewT> og) {
-      this.og = og;
+    WriteView(PCollectionView<ViewT> view) {
+      this.view = view;
     }
 
     @Override
@@ -112,7 +122,7 @@ class ViewOverrideFactory<ElemT, ViewT>
 
     @SuppressWarnings("deprecation")
     public PCollectionView<ViewT> getView() {
-      return og.getView();
+      return view;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index ad1aecc..5bc48b7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -66,7 +66,8 @@ public class ViewEvaluatorFactoryTest {
             .apply(GroupByKey.<Void, String>create())
             .apply(Values.<Iterable<String>>create());
     PCollection<Iterable<String>> view =
-        concat.apply(new ViewOverrideFactory.WriteView<>(createView));
+        concat.apply(
+            new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView()));
 
     EvaluationContext context = mock(EvaluationContext.class);
     TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 94728c7..6af9273 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.direct;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
@@ -36,8 +37,11 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
@@ -67,7 +71,7 @@ public class ViewOverrideFactoryTest implements Serializable {
             factory.getReplacementTransform(
                 AppliedPTransform
                     .<PCollection<Integer>, PCollection<Integer>,
-                        CreatePCollectionView<Integer, List<Integer>>>
+                        PTransform<PCollection<Integer>, PCollection<Integer>>>
                         of(
                             "foo",
                             ints.expand(),
@@ -102,7 +106,7 @@ public class ViewOverrideFactoryTest implements Serializable {
         factory.getReplacementTransform(
             AppliedPTransform
                 .<PCollection<Integer>, PCollection<Integer>,
-                    CreatePCollectionView<Integer, List<Integer>>>
+                    PTransform<PCollection<Integer>, PCollection<Integer>>>
                     of(
                         "foo",
                         ints.expand(),
@@ -120,8 +124,19 @@ public class ViewOverrideFactoryTest implements Serializable {
                   "There should only be one WriteView primitive in the graph",
                   writeViewVisited.getAndSet(true),
                   is(false));
-              PCollectionView replacementView = ((WriteView) node.getTransform()).getView();
-              assertThat(replacementView, Matchers.<PCollectionView>theInstance(view));
+              PCollectionView<?> replacementView = ((WriteView) node.getTransform()).getView();
+
+              // replacementView.getPCollection() is null, but that is not a requirement
+              // so not asserted one way or the other
+              assertThat(
+                  replacementView.getTagInternal(),
+                  equalTo(view.getTagInternal()));
+              assertThat(
+                  replacementView.getViewFn(),
+                  Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn()));
+              assertThat(
+                  replacementView.getWindowMappingFn(),
+                  Matchers.<WindowMappingFn<?>>equalTo(view.getWindowMappingFn()));
               assertThat(node.getInputs().entrySet(), hasSize(1));
             }
           }

http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 5e2e2c3..0c04370 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -282,6 +282,16 @@ public class PCollectionViews {
             }
           }));
     }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof ListViewFn;
+    }
+
+    @Override
+    public int hashCode() {
+      return ListViewFn.class.hashCode();
+    }
   }
 
   /**