You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/09 02:04:13 UTC

[1/2] incubator-beam git commit: Move expansion of Window.Bound into DirectPipelineRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master a43f9b820 -> a32a26208


Move expansion of Window.Bound into DirectPipelineRunner

In the Beam model, windowing is a primitive concept. The expansion provided
by the SDK is not implementable except via access to privileged methods
not intended for Beam pipeline authors.

This change is a precursor to eliminating these privileged entirely.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42969cb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42969cb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42969cb6

Branch: refs/heads/master
Commit: 42969cb62222744c41debe575857fb7d093ce527
Parents: 5f24cef
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 7 17:34:19 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Apr 8 15:03:00 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/DirectPipelineRunner.java       | 47 ++++++++++++++++++++
 .../inprocess/WindowEvaluatorFactory.java       | 13 +++---
 .../sdk/transforms/windowing/Window.java        | 21 ++-------
 .../cloud/dataflow/sdk/util/AssignWindows.java  | 46 +++++++++++++++++++
 4 files changed, 104 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
index 35e392b..57e6116 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
@@ -47,7 +47,10 @@ import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.Partition;
 import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
 import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
+import com.google.cloud.dataflow.sdk.util.AssignWindows;
 import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
@@ -57,6 +60,7 @@ import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunners;
 import com.google.cloud.dataflow.sdk.util.SerializableUtils;
 import com.google.cloud.dataflow.sdk.util.TestCredential;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.util.common.Counter;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
 import com.google.cloud.dataflow.sdk.values.KV;
@@ -255,6 +259,9 @@ public class DirectPipelineRunner
     } else if (transform instanceof GroupByKey) {
       return (OutputT)
           ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
+    } else if (transform instanceof Window.Bound) {
+      return (OutputT)
+          ((PCollection) input).apply(new AssignWindowsAndSetStrategy((Window.Bound) transform));
     } else {
       return super.apply(transform, input);
     }
@@ -400,6 +407,46 @@ public class DirectPipelineRunner
     }
   }
 
+  private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+
+    private final Window.Bound<T> wrapped;
+
+    public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
+      this.wrapped = wrapped;
+    }
+
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      WindowingStrategy<?, ?> outputStrategy =
+          wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
+
+      WindowFn<T, BoundedWindow> windowFn =
+          (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+
+      // If the Window.Bound transform only changed parts other than the WindowFn, then
+      // we skip AssignWindows even though it should be harmless in a perfect world.
+      // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
+      // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
+      // AssignWindows in this case.
+      if (wrapped.getWindowFn() == null) {
+        return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
+            .setWindowingStrategyInternal(outputStrategy);
+      } else {
+        return input
+            .apply("AssignWindows", new AssignWindows<T, BoundedWindow>(windowFn))
+            .setWindowingStrategyInternal(outputStrategy);
+      }
+    }
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element());
+    }
+  }
+
   /**
    * Apply the override for AvroIO.Write.Bound if the user requested sharding controls
    * greater than one.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
index 0bdfac9..e553dbb 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
@@ -61,23 +61,24 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
     if (fn == null) {
       return PassthroughTransformEvaluator.create(transform, outputBundle);
     }
-    return new WindowIntoEvaluator<>(fn, evaluationContext, outputBundle);
+    return new WindowIntoEvaluator<>(transform, fn, outputBundle);
   }
 
   private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> {
+    private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
+        transform;
     private final WindowFn<InputT, ?> windowFn;
-    private final InProcessEvaluationContext context;
     private final UncommittedBundle<InputT> outputBundle;
 
     @SuppressWarnings("unchecked")
     public WindowIntoEvaluator(
+        AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
         WindowFn<? super InputT, ?> windowFn,
-        InProcessEvaluationContext context,
         UncommittedBundle<InputT> outputBundle) {
+      this.outputBundle = outputBundle;
+      this.transform = transform;
       // Safe contravariant cast
       this.windowFn = (WindowFn<InputT, ?>) windowFn;
-      this.context = context;
-      this.outputBundle = outputBundle;
     }
 
     @Override
@@ -98,7 +99,7 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
 
     @Override
     public InProcessTransformResult finishBundle() throws Exception {
-      return StepTransformResult.withoutHold(null).addOutput(outputBundle).build();
+      return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
index 1e7282d..20b3ed5 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java
@@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode;
 import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -360,6 +359,7 @@ public class Window {
    */
   public static class Bound<T> extends PTransform<PCollection<T>, PCollection<T>> {
 
+
     @Nullable private final WindowFn<? super T, ?> windowFn;
     @Nullable private final Trigger<?> trigger;
     @Nullable private final AccumulationMode mode;
@@ -587,21 +587,8 @@ public class Window {
     public PCollection<T> apply(PCollection<T> input) {
       WindowingStrategy<?, ?> outputStrategy =
           getOutputStrategyInternal(input.getWindowingStrategy());
-      PCollection<T> output;
-      if (windowFn != null) {
-        // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
-        output = assignWindows(input, windowFn);
-      } else {
-        // If the windowFn didn't change, we just run a pass-through transform and then set the
-        // new windowing strategy.
-        output = input.apply(Window.<T>identity());
-      }
-      return output.setWindowingStrategyInternal(outputStrategy);
-    }
-
-    private <T, W extends BoundedWindow> PCollection<T> assignWindows(
-        PCollection<T> input, WindowFn<? super T, W> windowFn) {
-      return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<T, W>(windowFn)));
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), outputStrategy, input.isBounded());
     }
 
     @Override
@@ -639,7 +626,7 @@ public class Window {
    *  windows to be merged again as part of the next
    * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}.
    */
-  public static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> {
+  private static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> {
     @Override
     public PCollection<T> apply(PCollection<T> input) {
       WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java
new file mode 100644
index 0000000..57f489d
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a
+ * {@link PCollection} to windows according to the provided {@link WindowFn}.
+ *
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+public class AssignWindows<T, W extends BoundedWindow>
+    extends PTransform<PCollection<T>, PCollection<T>> {
+
+  private WindowFn<? super T, W> fn;
+
+  public AssignWindows(WindowFn<? super T, W> fn) {
+    this.fn = fn;
+  }
+
+  @Override
+  public PCollection<T> apply(PCollection<T> input) {
+    return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
+  }
+}


[2/2] incubator-beam git commit: This closes #147

Posted by ke...@apache.org.
This closes #147


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a32a2620
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a32a2620
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a32a2620

Branch: refs/heads/master
Commit: a32a26208854a0875b10026d8f659ac275e37560
Parents: a43f9b8 42969cb
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Apr 8 17:04:03 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Apr 8 17:04:03 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/DirectPipelineRunner.java       | 47 ++++++++++++++++++++
 .../inprocess/WindowEvaluatorFactory.java       | 13 +++---
 .../sdk/transforms/windowing/Window.java        | 21 ++-------
 .../cloud/dataflow/sdk/util/AssignWindows.java  | 46 +++++++++++++++++++
 4 files changed, 104 insertions(+), 23 deletions(-)
----------------------------------------------------------------------