You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/08 19:00:45 UTC

[1/3] beam git commit: DirectRunner override matchers using Runner API

Repository: beam
Updated Branches:
  refs/heads/master b53e6f0dc -> 4aef93854


DirectRunner override matchers using Runner API


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8d90878
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8d90878
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8d90878

Branch: refs/heads/master
Commit: d8d9087877c01f1786271726a541fb3eeda7f939
Parents: ca7b9c2
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 06:31:16 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 8 11:36:28 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d8d90878/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index dbd1ec4..136ccf3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -42,12 +43,9 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -230,33 +228,33 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
                 new WriteWithShardingFactory())) /* Uses a view internally. */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+                PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
                 new ViewOverrideFactory())) /* Uses pardos and GBKs */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(TestStream.class),
+                PTransformMatchers.urnEqualTo(PTransformTranslation.TEST_STREAM_TRANSFORM_URN),
                 new DirectTestStreamFactory(this))) /* primitive */
         // SplittableParMultiDo is implemented in terms of nonsplittable simple ParDos and extra
         // primitives
         .add(
             PTransformOverride.of(
-                PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()))
+                PTransformMatchers.splittableParDo(), new ParDoMultiOverrideFactory()))
         // state and timer pardos are implemented in terms of simple ParDos and extra primitives
         .add(
             PTransformOverride.of(
-                PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()))
+                PTransformMatchers.stateOrTimerParDo(), new ParDoMultiOverrideFactory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
+                PTransformMatchers.urnEqualTo(
+                    SplittableParDo.SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN),
                 new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(
-                    SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class),
+                PTransformMatchers.urnEqualTo(SplittableParDo.SPLITTABLE_GBKIKWI_URN),
                 new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(GroupByKey.class),
+                PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN),
                 new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives. */
         .build();
   }


[3/3] beam git commit: This closes #3239: [BEAM-2371] Port DirectRunner override matchers to language-agnostic construction APIs

Posted by ke...@apache.org.
This closes #3239: [BEAM-2371] Port DirectRunner override matchers to language-agnostic construction APIs

  DirectRunner override matchers using Runner API
  Add Runner API oriented PTransformMatchers for DirectRunner overrides


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4aef9385
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4aef9385
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4aef9385

Branch: refs/heads/master
Commit: 4aef9385477f0d13bdaa345cb5a629b2fd00b867
Parents: b53e6f0 d8d9087
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 11:37:15 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 8 11:37:15 2017 -0700

----------------------------------------------------------------------
 .../core/construction/PTransformMatchers.java   | 94 +++++++++++++++++++-
 .../construction/PTransformTranslation.java     |  7 +-
 .../construction/PTransformMatchersTest.java    | 32 +++++++
 .../beam/runners/direct/DirectRunner.java       | 20 ++---
 4 files changed, 137 insertions(+), 16 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Add Runner API oriented PTransformMatchers for DirectRunner overrides

Posted by ke...@apache.org.
Add Runner API oriented PTransformMatchers for DirectRunner overrides


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca7b9c28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca7b9c28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca7b9c28

Branch: refs/heads/master
Commit: ca7b9c288151d318898ab000b91d26fcf62046ca
Parents: b53e6f0
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 06:29:09 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 8 11:36:28 2017 -0700

----------------------------------------------------------------------
 .../core/construction/PTransformMatchers.java   | 94 +++++++++++++++++++-
 .../construction/PTransformTranslation.java     |  7 +-
 .../construction/PTransformMatchersTest.java    | 32 +++++++
 3 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index bfe24a0..c339891 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.core.construction;
 
 import com.google.common.base.MoreObjects;
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -50,6 +51,34 @@ public class PTransformMatchers {
   private PTransformMatchers() {}
 
   /**
+   * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the URN of the
+   * {@link PTransform} is equal to the URN provided ot this matcher.
+   */
+  public static PTransformMatcher urnEqualTo(String urn) {
+    return new EqualUrnPTransformMatcher(urn);
+  }
+
+  private static class EqualUrnPTransformMatcher implements PTransformMatcher {
+    private final String urn;
+
+    private EqualUrnPTransformMatcher(String urn) {
+      this.urn = urn;
+    }
+
+    @Override
+    public boolean matches(AppliedPTransform<?, ?, ?> application) {
+      return urn.equals(PTransformTranslation.urnForTransformOrNull(application.getTransform()));
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("urn", urn)
+          .toString();
+    }
+  }
+
+  /**
    * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the class of the
    * {@link PTransform} is equal to the {@link Class} provided ot this matcher.
    */
@@ -151,6 +180,68 @@ public class PTransformMatchers {
   }
 
   /**
+   * A {@link PTransformMatcher} that matches a {@link ParDo} by URN if it has a splittable {@link
+   * DoFn}.
+   */
+  public static PTransformMatcher splittableParDo() {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals(
+            PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
+
+          try {
+            return ParDoTranslation.isSplittable(application);
+          } catch (IOException e) {
+            throw new RuntimeException(
+                String.format(
+                    "Transform with URN %s could not be translated",
+                    PTransformTranslation.PAR_DO_TRANSFORM_URN),
+                e);
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public String toString() {
+        return MoreObjects.toStringHelper("SplittableParDoMultiMatcher").toString();
+      }
+    };
+  }
+
+  /**
+   * A {@link PTransformMatcher} that matches a {@link ParDo} transform by URN
+   * and whether it contains state or timers as specified by {@link ParDoTranslation}.
+   */
+  public static PTransformMatcher stateOrTimerParDo() {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals(
+            PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
+
+          try {
+            return ParDoTranslation.usesStateOrTimers(application);
+          } catch (IOException e) {
+            throw new RuntimeException(
+                String.format(
+                    "Transform with URN %s could not be translated",
+                    PTransformTranslation.PAR_DO_TRANSFORM_URN),
+                e);
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public String toString() {
+        return MoreObjects.toStringHelper("StateOrTimerParDoMatcher").toString();
+      }
+    };
+  }
+
+  /**
    * A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} containing a {@link DoFn}
    * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and
    * {@link DoFnSignature#usesTimers()}.
@@ -268,7 +359,8 @@ public class PTransformMatchers {
     return new PTransformMatcher() {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        if (application.getTransform() instanceof WriteFiles) {
+        if (PTransformTranslation.WRITE_FILES_TRANSFORM_URN.equals(
+            PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
           WriteFiles write = (WriteFiles) application.getTransform();
           return write.getSharding() == null && write.getNumShards() == null;
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 32ecf43..bae7b05 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -179,13 +179,12 @@ public class PTransformTranslation {
    * Returns the URN for the transform if it is known, otherwise throws.
    */
   public static String urnForTransform(PTransform<?, ?> transform) {
-    TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
-    if (translator == null) {
+    String urn = urnForTransformOrNull(transform);
+    if (urn == null) {
       throw new IllegalStateException(
           String.format("No translator known for %s", transform.getClass().getName()));
     }
-
-    return translator.getUrn(transform);
+    return urn;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 2497598..6459849 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -27,6 +27,8 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.util.Collections;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy;
@@ -95,9 +97,14 @@ public class PTransformMatchersTest implements Serializable {
     PCollection<KV<String, Integer>> input =
         PCollection.createPrimitiveOutputInternal(
             p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+    input.setName("dummy input");
+    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
     PCollection<Integer> output =
         PCollection.createPrimitiveOutputInternal(
             p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+    output.setName("dummy output");
+    output.setCoder(VarIntCoder.of());
 
     return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p);
   }
@@ -272,6 +279,18 @@ public class PTransformMatchersTest implements Serializable {
   }
 
   @Test
+  public void parDoSplittable() {
+    AppliedPTransform<?, ?, ?> parDoApplication =
+        getAppliedTransform(
+            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
+    assertThat(PTransformMatchers.splittableParDo().matches(parDoApplication), is(true));
+
+    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
+  }
+
+  @Test
   public void parDoMultiWithState() {
     AppliedPTransform<?, ?, ?> parDoApplication =
         getAppliedTransform(
@@ -284,6 +303,19 @@ public class PTransformMatchersTest implements Serializable {
   }
 
   @Test
+  public void parDoWithState() {
+    AppliedPTransform<?, ?, ?> statefulApplication =
+        getAppliedTransform(
+            ParDo.of(doFnWithState).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
+    assertThat(PTransformMatchers.stateOrTimerParDo().matches(statefulApplication), is(true));
+
+    AppliedPTransform<?, ?, ?> splittableApplication =
+        getAppliedTransform(
+            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
+    assertThat(PTransformMatchers.stateOrTimerParDo().matches(splittableApplication), is(false));
+  }
+
+  @Test
   public void parDoMultiWithTimers() {
     AppliedPTransform<?, ?, ?> parDoApplication =
         getAppliedTransform(