You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/04/19 21:40:29 UTC

[2/5] incubator-beam git commit: Replace valueInEmptyWindows with valueInGlobalWindow in Spark Function, and add per-value (non-RDD) windowing functions

Replace valueInEmptyWindows with valueInGlobalWindow in Spark Function, and add per-value (non-RDD)
windowing functions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d852c5b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d852c5b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d852c5b9

Branch: refs/heads/master
Commit: d852c5b9391a7dc6d9eea21f8a4e0905d2cd7b28
Parents: 5fab1c5
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 22:02:20 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:06:15 2016 +0300

----------------------------------------------------------------------
 .../spark/translation/WindowingHelpers.java     | 38 +++++++++++++++++---
 1 file changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d852c5b9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
index e92b6d1..ec94f3e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
@@ -29,8 +29,8 @@ public final class WindowingHelpers {
   }
 
   /**
-   * A function for converting a value to a {@link WindowedValue}. The resulting
-   * {@link WindowedValue} will be in no windows, and will have the default timestamp
+   * A Spark function for converting a value to a {@link WindowedValue}. The resulting
+   * {@link WindowedValue} will be in a global windows, and will have the default timestamp == MIN
    * and pane.
    *
    * @param <T>   The type of the object.
@@ -40,13 +40,13 @@ public final class WindowingHelpers {
     return new Function<T, WindowedValue<T>>() {
       @Override
       public WindowedValue<T> call(T t) {
-        return WindowedValue.valueInEmptyWindows(t);
+        return WindowedValue.valueInGlobalWindow(t);
       }
     };
   }
 
   /**
-   * A function for extracting the value from a {@link WindowedValue}.
+   * A Spark function for extracting the value from a {@link WindowedValue}.
    *
    * @param <T>   The type of the object.
    * @return A function that accepts a {@link WindowedValue} and returns its value.
@@ -59,4 +59,34 @@ public final class WindowingHelpers {
       }
     };
   }
+
+  /**
+   * Same as windowFunction but for non-RDD values - not an RDD transformation!
+   *
+   * @param <T>   The type of the object.
+   * @return A function that accepts an object and returns its {@link WindowedValue}.
+   */
+  public static <T> com.google.common.base.Function<T, WindowedValue<T>> windowValueFunction() {
+    return new com.google.common.base.Function<T, WindowedValue<T>>() {
+      @Override
+      public WindowedValue<T> apply(T t) {
+        return WindowedValue.valueInGlobalWindow(t);
+      }
+    };
+  }
+
+  /**
+   * Same as unwindowFunction but for non-RDD values - not an RDD transformation!
+   *
+   * @param <T>   The type of the object.
+   * @return A function that accepts an object and returns its {@link WindowedValue}.
+   */
+  public static <T> com.google.common.base.Function<WindowedValue<T>, T> unwindowValueFunction() {
+    return new com.google.common.base.Function<WindowedValue<T>, T>() {
+      @Override
+      public T apply(WindowedValue<T> t) {
+        return t.getValue();
+      }
+    };
+  }
 }