You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:17 UTC

[24/50] [abbrv] incubator-beam git commit: [Spark] Elide assigning windows when WindowFn is null

[Spark] Elide assigning windows when WindowFn is null

Previously, when translating a Window.Bound transform, the case
where the WindowFn was null was missed, resulting in a
NullPointerException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8278e5f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8278e5f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8278e5f7

Branch: refs/heads/python-sdk
Commit: 8278e5f78f36fb48fae994ee7abcc1485db84189
Parents: 0a7246d
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 15 10:42:59 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:29 2016 -0700

----------------------------------------------------------------------
 .../spark/translation/TransformTranslator.java     | 17 ++++++++++-------
 .../apache/beam/sdk/util/AssignWindowsDoFn.java    |  9 ++++++++-
 2 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8278e5f7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index ebceb6b..34a0ede 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -689,8 +689,6 @@ public final class TransformTranslator {
     rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf);
   }
 
-  private static final FieldGetter WINDOW_FG = new FieldGetter(Window.Bound.class);
-
   private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
     return new TransformEvaluator<Window.Bound<T>>() {
       @Override
@@ -698,14 +696,19 @@ public final class TransformTranslator {
         @SuppressWarnings("unchecked")
         JavaRDDLike<WindowedValue<T>, ?> inRDD =
             (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform);
-        WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
+
+        @SuppressWarnings("unchecked")
+        WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn();
+
         // Avoid running assign windows if both source and destination are global window
-        if (context.getInput(transform).getWindowingStrategy().getWindowFn()
-                instanceof GlobalWindows
-            && windowFn instanceof GlobalWindows) {
+        // or if the user has not specified the WindowFn (meaning they are just messing
+        // with triggering or allowed lateness)
+        if (windowFn == null
+            || (context.getInput(transform).getWindowingStrategy().getWindowFn()
+                    instanceof GlobalWindows
+                && windowFn instanceof GlobalWindows)) {
           context.setOutputRDD(transform, inRDD);
         } else {
-          @SuppressWarnings("unchecked")
           DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
           DoFnFunction<T, T> dofn =
               new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8278e5f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
index e71a47e..caec40e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -37,7 +39,12 @@ public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> {
   private WindowFn<? super T, W> fn;
 
   public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
-    this.fn = fn;
+    this.fn =
+        checkNotNull(
+            fn,
+            "%s provided to %s cannot be null",
+            WindowFn.class.getSimpleName(),
+            AssignWindowsDoFn.class.getSimpleName());
   }
 
   @Override