You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/08 19:28:51 UTC

[1/2] incubator-beam git commit: Add PTransformOverrideFactory

Repository: incubator-beam
Updated Branches:
  refs/heads/master 245cffa5e -> 6348a1fe2


Add PTransformOverrideFactory

This constructs an appropriate custom PTransform override for the runner
it is meant to be used in, which is applied in place of the original
PTransform.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d1486c66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d1486c66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d1486c66

Branch: refs/heads/master
Commit: d1486c66619891903ed4c0c57c354af06ffbd23c
Parents: 1aee017
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 29 10:03:25 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Apr 8 10:27:50 2016 -0700

----------------------------------------------------------------------
 .../inprocess/GroupByKeyEvaluatorFactory.java   | 21 +++++++++-
 .../sdk/runners/inprocess/InProcessCreate.java  | 18 +++++++++
 .../inprocess/InProcessPipelineRunner.java      | 41 +++++++-------------
 .../inprocess/PTransformOverrideFactory.java    | 33 ++++++++++++++++
 .../runners/inprocess/ViewEvaluatorFactory.java | 28 +++++++++++--
 5 files changed, 109 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d1486c66/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index 0518eec..4b478ad 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -42,6 +42,8 @@ import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.util.ArrayList;
@@ -179,10 +181,27 @@ class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory {
   }
 
   /**
+   * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
+   */
+  public static final class InProcessGroupByKeyOverrideFactory
+      implements PTransformOverrideFactory {
+    @Override
+    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+        PTransform<InputT, OutputT> transform) {
+      if (transform instanceof GroupByKey) {
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        PTransform<InputT, OutputT> override = new InProcessGroupByKey((GroupByKey) transform);
+        return override;
+      }
+      return transform;
+    }
+  }
+
+  /**
    * An in-memory implementation of the {@link GroupByKey} primitive as a composite
    * {@link PTransform}.
    */
-  public static final class InProcessGroupByKey<K, V>
+  private static final class InProcessGroupByKey<K, V>
       extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
     private final GroupByKey<K, V> original;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d1486c66/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
index efda8fc..00587ea 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
@@ -29,6 +29,7 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.util.CoderUtils;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -53,6 +54,23 @@ import javax.annotation.Nullable;
 class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
   private final Create.Values<T> original;
 
+  /**
+   * A {@link PTransformOverrideFactory} for {@link InProcessCreate}.
+   */
+  public static class InProcessCreateOverrideFactory implements PTransformOverrideFactory {
+    @Override
+    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+        PTransform<InputT, OutputT> transform) {
+      if (transform instanceof Create.Values) {
+        @SuppressWarnings("unchecked")
+        PTransform<InputT, OutputT> override =
+            (PTransform<InputT, OutputT>) from((Create.Values<OutputT>) transform);
+        return override;
+      }
+      return transform;
+    }
+  }
+
   public static <T> InProcessCreate<T> from(Create.Values<T> original) {
     return new InProcessCreate<>(original);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d1486c66/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 4fb01b7..9ce4430 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -26,15 +26,16 @@ import com.google.cloud.dataflow.sdk.runners.AggregatorPipelineExtractor;
 import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
 import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey;
 import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessCreate.InProcessCreateOverrideFactory;
+import com.google.cloud.dataflow.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessViewOverrideFactory;
 import com.google.cloud.dataflow.sdk.transforms.Aggregator;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
 import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
 import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
 import com.google.cloud.dataflow.sdk.util.UserCodeException;
@@ -75,14 +76,12 @@ public class InProcessPipelineRunner
    * type of transform it is overriding.
    */
   @SuppressWarnings("rawtypes")
-  private static Map<Class<? extends PTransform>, Class<? extends PTransform>>
+  private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
       defaultTransformOverrides =
-          ImmutableMap.<Class<? extends PTransform>, Class<? extends PTransform>>builder()
-              .put(Create.Values.class, InProcessCreate.class)
-              .put(GroupByKey.class, InProcessGroupByKey.class)
-              .put(
-                  CreatePCollectionView.class,
-                  ViewEvaluatorFactory.InProcessCreatePCollectionView.class)
+          ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
+              .put(Create.Values.class, new InProcessCreateOverrideFactory())
+              .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
+              .put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
               .build();
 
   /**
@@ -191,28 +190,14 @@ public class InProcessPipelineRunner
   @Override
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
-    Class<?> overrideClass = defaultTransformOverrides.get(transform.getClass());
-    if (overrideClass != null) {
-      // It is the responsibility of whoever constructs overrides to ensure this is type safe.
-      @SuppressWarnings("unchecked")
-      Class<PTransform<InputT, OutputT>> transformClass =
-          (Class<PTransform<InputT, OutputT>>) transform.getClass();
+    PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass());
+    if (overrideFactory != null) {
+      PTransform<InputT, OutputT> customTransform = overrideFactory.override(transform);
 
-      @SuppressWarnings("unchecked")
-      Class<PTransform<InputT, OutputT>> customTransformClass =
-          (Class<PTransform<InputT, OutputT>>) overrideClass;
-
-      PTransform<InputT, OutputT> customTransform =
-          InstanceBuilder.ofType(customTransformClass)
-          .withArg(transformClass, transform)
-          .build();
-
-      // This overrides the contents of the apply method without changing the TransformTreeNode that
-      // is generated by the PCollection application.
       return super.apply(customTransform, input);
-    } else {
-      return super.apply(transform, input);
     }
+    // If there is no override, or we should not apply the override, apply the original transform
+    return super.apply(transform, input);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d1486c66/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PTransformOverrideFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PTransformOverrideFactory.java
new file mode 100644
index 0000000..77fa585
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PTransformOverrideFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+
+interface PTransformOverrideFactory {
+  /**
+   * Create a {@link PTransform} override for the provided {@link PTransform} if applicable.
+   * Otherwise, return the input {@link PTransform}.
+   *
+   * <p>The returned PTransform must be semantically equivalent to the input {@link PTransform}.
+   */
+  <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+      PTransform<InputT, OutputT> transform);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d1486c66/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
index f78fc49..112ea0c 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
@@ -29,6 +29,8 @@ import com.google.cloud.dataflow.sdk.transforms.WithKeys;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -50,7 +52,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
       InProcessPipelineRunner.CommittedBundle<?> inputBundle,
       InProcessEvaluationContext evaluationContext) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createEvaluator(
+    TransformEvaluator<T> evaluator = createEvaluator(
             (AppliedPTransform) application, evaluationContext);
     return evaluator;
   }
@@ -80,11 +82,26 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
     };
   }
 
+  public static class InProcessViewOverrideFactory implements PTransformOverrideFactory {
+    @Override
+    public <InputT extends PInput, OutputT extends POutput>
+        PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
+      if (transform instanceof CreatePCollectionView) {
+
+      }
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      PTransform<InputT, OutputT> createView =
+          (PTransform<InputT, OutputT>)
+              new InProcessCreatePCollectionView<>((CreatePCollectionView) transform);
+      return createView;
+    }
+  }
+
   /**
    * An in-process override for {@link CreatePCollectionView}.
    */
-  public static class InProcessCreatePCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+  private static class InProcessCreatePCollectionView<ElemT, ViewT>
+      extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
     private final CreatePCollectionView<ElemT, ViewT> og;
 
     private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
@@ -99,6 +116,11 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
           .apply(Values.<Iterable<ElemT>>create())
           .apply(new WriteView<ElemT, ViewT>(og));
     }
+
+    @Override
+    protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
+      return og;
+    }
   }
 
   /**


[2/2] incubator-beam git commit: This closes #131

Posted by ke...@apache.org.
This closes #131


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6348a1fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6348a1fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6348a1fe

Branch: refs/heads/master
Commit: 6348a1fe24683d531e053d5be1e56e38635d2957
Parents: 245cffa d1486c6
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Apr 8 10:28:32 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Apr 8 10:28:32 2016 -0700

----------------------------------------------------------------------
 .../inprocess/GroupByKeyEvaluatorFactory.java   | 21 +++++++++-
 .../sdk/runners/inprocess/InProcessCreate.java  | 18 ++++++++
 .../inprocess/InProcessPipelineRunner.java      | 43 +++++++-------------
 .../inprocess/PTransformOverrideFactory.java    | 33 +++++++++++++++
 .../runners/inprocess/ViewEvaluatorFactory.java | 28 +++++++++++--
 5 files changed, 110 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6348a1fe/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6348a1fe/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --cc sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index f5b7f3c,9ce4430..fa93994
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@@ -33,9 -35,7 +35,8 @@@ import com.google.cloud.dataflow.sdk.tr
  import com.google.cloud.dataflow.sdk.transforms.Create;
  import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
  import com.google.cloud.dataflow.sdk.transforms.PTransform;
 +import com.google.cloud.dataflow.sdk.transforms.ParDo;
  import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
- import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
  import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
  import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
  import com.google.cloud.dataflow.sdk.util.UserCodeException;