You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/06 20:28:11 UTC

[2/3] beam git commit: Improve Splittable ParDo translation

Improve Splittable ParDo translation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b00d95a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b00d95a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b00d95a

Branch: refs/heads/master
Commit: 1b00d95a1105d2611b985dc463da0884a6646354
Parents: 840492d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 06:29:16 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 6 13:13:12 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ParDoTranslation.java     | 20 +++++++++++
 .../core/construction/SplittableParDo.java      | 18 ++++++++--
 .../core/construction/ParDoTranslationTest.java | 35 +++++++++++++++++++-
 .../core/SplittableParDoViaKeyedWorkItems.java  | 10 +++++-
 .../direct/KeyedPValueTrackingVisitor.java      |  2 +-
 .../src/main/proto/beam_runner_api.proto        |  3 ++
 6 files changed, 82 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index fe66179..34e0d86 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -144,6 +144,7 @@ public class ParDoTranslation {
 
     ParDoPayload.Builder builder = ParDoPayload.newBuilder();
     builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
+    builder.setSplittable(signature.processElement().isSplittable());
     for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
       builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
     }
@@ -496,6 +497,25 @@ public class ParDoTranslation {
         .build();
   }
 
+  private static <T> ParDoPayload getParDoPayload(AppliedPTransform<?, ?, ?> transform)
+      throws IOException {
+    return PTransformTranslation.toProto(
+            transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
+        .getSpec()
+        .getParameter()
+        .unpack(ParDoPayload.class);
+  }
+
+  public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
+    ParDoPayload payload = getParDoPayload(transform);
+    return payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0;
+  }
+
+  public static boolean isSplittable(AppliedPTransform<?, ?, ?> transform) throws IOException {
+    ParDoPayload payload = getParDoPayload(transform);
+    return payload.getSplittable();
+  }
+
   private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn)
       throws InvalidProtocolBufferException {
     FunctionSpec spec = viewFn.getSpec();

http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index dfca7d2..665e39d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.List;
 import java.util.UUID;
+import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -67,6 +68,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   public static final String SPLITTABLE_PROCESS_URN =
       "urn:beam:runners_core:transforms:splittable_process:v1";
 
+  public static final String SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN =
+      "urn:beam:runners_core:transforms:splittable_process_keyed_elements:v1";
+
+  public static final String SPLITTABLE_GBKIKWI_URN =
+      "urn:beam:runners_core:transforms:splittable_gbkikwi:v1";
+
   /**
    * Creates the transform for the given original multi-output {@link ParDo}.
    *
@@ -133,11 +140,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
   /**
    * Runner-specific primitive {@link PTransform} that invokes the {@link DoFn.ProcessElement}
-   * method for a splittable {@link DoFn} on each {@link ElementAndRestriction} of the input
-   * {@link PCollection} of {@link KV KVs} keyed with arbitrary but globally unique keys.
+   * method for a splittable {@link DoFn} on each {@link ElementAndRestriction} of the input {@link
+   * PCollection} of {@link KV KVs} keyed with arbitrary but globally unique keys.
    */
   public static class ProcessKeyedElements<InputT, OutputT, RestrictionT>
-      extends PTransform<
+      extends RawPTransform<
           PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> {
     private final DoFn<InputT, OutputT> fn;
     private final Coder<InputT> elementCoder;
@@ -227,6 +234,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
       return outputs;
     }
+
+    @Override
+    public String getUrn() {
+      return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index 46f6a80..a8490bf 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -111,7 +112,11 @@ public class ParDoTranslationTest {
           ParDo.of(new DropElementsFn())
               .withOutputTags(
                   new TupleTag<Void>(),
-                  TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})));
+                  TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})),
+      ParDo.of(new SplittableDropElementsFn())
+          .withOutputTags(
+              new TupleTag<Void>(),
+              TupleTagList.empty()));
     }
 
     @Parameter(0)
@@ -235,6 +240,34 @@ public class ParDoTranslationTest {
     }
   }
 
+  private static class SplittableDropElementsFn extends DoFn<KV<Long, String>, Void> {
+    @ProcessElement
+    public void proc(ProcessContext context, RestrictionTracker<Integer> restriction) {
+      context.output(null);
+    }
+
+    @GetInitialRestriction
+    public Integer restriction(KV<Long, String> elem) {
+      return 42;
+    }
+
+    @NewTracker
+    public RestrictionTracker<Integer> newTracker(Integer restriction) {
+      throw new UnsupportedOperationException("Should never be called; only to test translation");
+    }
+
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof SplittableDropElementsFn;
+    }
+
+    @Override
+    public int hashCode() {
+      return SplittableDropElementsFn.class.hashCode();
+    }
+  }
+
   @SuppressWarnings("unused")
   private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> {
     private static final String BAG_STATE_ID = "bagState";

http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index b38e364..c4b086a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -23,7 +23,9 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -67,12 +69,18 @@ public class SplittableParDoViaKeyedWorkItems {
    * emits output immediately.
    */
   public static class GBKIntoKeyedWorkItems<KeyT, InputT>
-      extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
+      extends RawPTransform<
+          PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
     @Override
     public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
     }
+
+    @Override
+    public String getUrn() {
+      return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
+    }
   }
 
   /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */

http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index f9b2dae..6eadaba 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.values.TupleTag;
 class KeyedPValueTrackingVisitor extends PipelineVisitor.Defaults {
 
   private static final Set<Class<? extends PTransform>> PRODUCES_KEYED_OUTPUTS =
-      ImmutableSet.of(
+      ImmutableSet.<Class<? extends PTransform>>of(
           SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class,
           DirectGroupByKeyOnly.class,
           DirectGroupAlsoByWindow.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 87e33f0..039ecb0 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -220,6 +220,9 @@ message ParDoPayload {
 
   // (Optional) A mapping of local timer names to timer specifications.
   map<string, TimerSpec> timer_specs = 5;
+
+  // Whether the DoFn is splittable
+  bool splittable = 6;
 }
 
 // Parameters that a UDF might require.