You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/04/19 21:40:29 UTC
[2/5] incubator-beam git commit: Replace valueInEmptyWindows with
valueInGlobalWindow in Spark Function,
and add per-value (non-RDD) windowing functions
Replace valueInEmptyWindows with valueInGlobalWindow in Spark Function, and add per-value (non-RDD)
windowing functions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d852c5b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d852c5b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d852c5b9
Branch: refs/heads/master
Commit: d852c5b9391a7dc6d9eea21f8a4e0905d2cd7b28
Parents: 5fab1c5
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 22:02:20 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:06:15 2016 +0300
----------------------------------------------------------------------
.../spark/translation/WindowingHelpers.java | 38 +++++++++++++++++---
1 file changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d852c5b9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
index e92b6d1..ec94f3e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
@@ -29,8 +29,8 @@ public final class WindowingHelpers {
}
/**
- * A function for converting a value to a {@link WindowedValue}. The resulting
- * {@link WindowedValue} will be in no windows, and will have the default timestamp
+ * A Spark function for converting a value to a {@link WindowedValue}. The resulting
+ * {@link WindowedValue} will be in a global windows, and will have the default timestamp == MIN
* and pane.
*
* @param <T> The type of the object.
@@ -40,13 +40,13 @@ public final class WindowingHelpers {
return new Function<T, WindowedValue<T>>() {
@Override
public WindowedValue<T> call(T t) {
- return WindowedValue.valueInEmptyWindows(t);
+ return WindowedValue.valueInGlobalWindow(t);
}
};
}
/**
- * A function for extracting the value from a {@link WindowedValue}.
+ * A Spark function for extracting the value from a {@link WindowedValue}.
*
* @param <T> The type of the object.
* @return A function that accepts a {@link WindowedValue} and returns its value.
@@ -59,4 +59,34 @@ public final class WindowingHelpers {
}
};
}
+
+ /**
+ * Same as windowFunction but for non-RDD values - not an RDD transformation!
+ *
+ * @param <T> The type of the object.
+ * @return A function that accepts an object and returns its {@link WindowedValue}.
+ */
+ public static <T> com.google.common.base.Function<T, WindowedValue<T>> windowValueFunction() {
+ return new com.google.common.base.Function<T, WindowedValue<T>>() {
+ @Override
+ public WindowedValue<T> apply(T t) {
+ return WindowedValue.valueInGlobalWindow(t);
+ }
+ };
+ }
+
+ /**
+ * Same as unwindowFunction but for non-RDD values - not an RDD transformation!
+ *
+ * @param <T> The type of the object.
+ * @return A function that accepts an object and returns its {@link WindowedValue}.
+ */
+ public static <T> com.google.common.base.Function<WindowedValue<T>, T> unwindowValueFunction() {
+ return new com.google.common.base.Function<WindowedValue<T>, T>() {
+ @Override
+ public T apply(WindowedValue<T> t) {
+ return t.getValue();
+ }
+ };
+ }
}