You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/04/19 21:40:28 UTC
[1/5] incubator-beam git commit: Materialize PCollection/RDD as
windowed values with the appropriate windows.
Repository: incubator-beam
Updated Branches:
refs/heads/master f20bf8afd -> 135cb733f
Materialize PCollection/RDD as windowed values with the appropriate windows.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1ca3b30d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1ca3b30d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1ca3b30d
Branch: refs/heads/master
Commit: 1ca3b30dd5679e75ce9e35dc08cc0012fb899186
Parents: d852c5b
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 22:05:14 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:06:15 2016 +0300
----------------------------------------------------------------------
.../spark/translation/EvaluationContext.java | 62 ++++++++++----------
1 file changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ca3b30d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 78a62aa..531a6ce 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -37,6 +37,9 @@ import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -76,12 +79,13 @@ public class EvaluationContext implements EvaluationResult {
*/
private class RDDHolder<T> {
- private Iterable<T> values;
+ private Iterable<WindowedValue<T>> windowedValues;
private Coder<T> coder;
private JavaRDDLike<WindowedValue<T>, ?> rdd;
RDDHolder(Iterable<T> values, Coder<T> coder) {
- this.values = values;
+ this.windowedValues =
+ Iterables.transform(values, WindowingHelpers.<T>windowValueFunction());
this.coder = coder;
}
@@ -91,14 +95,6 @@ public class EvaluationContext implements EvaluationResult {
JavaRDDLike<WindowedValue<T>, ?> getRDD() {
if (rdd == null) {
- Iterable<WindowedValue<T>> windowedValues = Iterables.transform(values,
- new Function<T, WindowedValue<T>>() {
- @Override
- public WindowedValue<T> apply(T t) {
- // TODO: this is wrong if T is a TimestampedValue
- return WindowedValue.valueInEmptyWindows(t);
- }
- });
WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
WindowedValue.getValueOnlyCoder(coder);
rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
@@ -107,29 +103,31 @@ public class EvaluationContext implements EvaluationResult {
return rdd;
}
- Iterable<T> getValues(PCollection<T> pcollection) {
- if (values == null) {
- coder = pcollection.getCoder();
- JavaRDDLike<byte[], ?> bytesRDD = rdd.map(WindowingHelpers.<T>unwindowFunction())
- .map(CoderHelpers.toByteFunction(coder));
+ Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) {
+ if (windowedValues == null) {
+ WindowFn<?, ?> windowFn =
+ pcollection.getWindowingStrategy().getWindowFn();
+ Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder();
+ final WindowedValue.WindowedValueCoder<T> windowedValueCoder;
+ if (windowFn instanceof GlobalWindows) {
+ windowedValueCoder =
+ WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder());
+ } else {
+ windowedValueCoder =
+ WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder);
+ }
+ JavaRDDLike<byte[], ?> bytesRDD =
+ rdd.map(CoderHelpers.toByteFunction(windowedValueCoder));
List<byte[]> clientBytes = bytesRDD.collect();
- values = Iterables.transform(clientBytes, new Function<byte[], T>() {
+ windowedValues = Iterables.transform(clientBytes,
+ new Function<byte[], WindowedValue<T>>() {
@Override
- public T apply(byte[] bytes) {
- return CoderHelpers.fromByteArray(bytes, coder);
+ public WindowedValue<T> apply(byte[] bytes) {
+ return CoderHelpers.fromByteArray(bytes, windowedValueCoder);
}
});
}
- return values;
- }
-
- Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
- return Iterables.transform(get(pcollection), new Function<T, WindowedValue<T>>() {
- @Override
- public WindowedValue<T> apply(T t) {
- return WindowedValue.valueInEmptyWindows(t); // TODO: not the right place?
- }
- });
+ return windowedValues;
}
}
@@ -264,15 +262,15 @@ public class EvaluationContext implements EvaluationResult {
@Override
public <T> Iterable<T> get(PCollection<T> pcollection) {
- @SuppressWarnings("unchecked")
- RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
- return rddHolder.getValues(pcollection);
+ @SuppressWarnings("unchecked") RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
+ Iterable<WindowedValue<T>> windowedValues = rddHolder.getValues(pcollection);
+ return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
}
<T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
@SuppressWarnings("unchecked")
RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
- return rddHolder.getWindowedValues(pcollection);
+ return rddHolder.getValues(pcollection);
}
@Override
[2/5] incubator-beam git commit: Replace valueInEmptyWindows with
valueInGlobalWindow in Spark Function,
and add per-value (non-RDD) windowing functions
Posted by am...@apache.org.
Replace valueInEmptyWindows with valueInGlobalWindow in Spark Function, and add per-value (non-RDD)
windowing functions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d852c5b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d852c5b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d852c5b9
Branch: refs/heads/master
Commit: d852c5b9391a7dc6d9eea21f8a4e0905d2cd7b28
Parents: 5fab1c5
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 22:02:20 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:06:15 2016 +0300
----------------------------------------------------------------------
.../spark/translation/WindowingHelpers.java | 38 +++++++++++++++++---
1 file changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d852c5b9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
index e92b6d1..ec94f3e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
@@ -29,8 +29,8 @@ public final class WindowingHelpers {
}
/**
- * A function for converting a value to a {@link WindowedValue}. The resulting
- * {@link WindowedValue} will be in no windows, and will have the default timestamp
+ * A Spark function for converting a value to a {@link WindowedValue}. The resulting
+ * {@link WindowedValue} will be in a global windows, and will have the default timestamp == MIN
* and pane.
*
* @param <T> The type of the object.
@@ -40,13 +40,13 @@ public final class WindowingHelpers {
return new Function<T, WindowedValue<T>>() {
@Override
public WindowedValue<T> call(T t) {
- return WindowedValue.valueInEmptyWindows(t);
+ return WindowedValue.valueInGlobalWindow(t);
}
};
}
/**
- * A function for extracting the value from a {@link WindowedValue}.
+ * A Spark function for extracting the value from a {@link WindowedValue}.
*
* @param <T> The type of the object.
* @return A function that accepts a {@link WindowedValue} and returns its value.
@@ -59,4 +59,34 @@ public final class WindowingHelpers {
}
};
}
+
+ /**
+ * Same as windowFunction but for non-RDD values - not an RDD transformation!
+ *
+ * @param <T> The type of the object.
+ * @return A function that accepts an object and returns its {@link WindowedValue}.
+ */
+ public static <T> com.google.common.base.Function<T, WindowedValue<T>> windowValueFunction() {
+ return new com.google.common.base.Function<T, WindowedValue<T>>() {
+ @Override
+ public WindowedValue<T> apply(T t) {
+ return WindowedValue.valueInGlobalWindow(t);
+ }
+ };
+ }
+
+ /**
+ * Same as unwindowFunction but for non-RDD values - not an RDD transformation!
+ *
+ * @param <T> The type of the object.
+ * @return A function that accepts an object and returns its {@link WindowedValue}.
+ */
+ public static <T> com.google.common.base.Function<WindowedValue<T>, T> unwindowValueFunction() {
+ return new com.google.common.base.Function<WindowedValue<T>, T>() {
+ @Override
+ public T apply(WindowedValue<T> t) {
+ return t.getValue();
+ }
+ };
+ }
}
[3/5] incubator-beam git commit: Replace valueInEmptyWindows with
valueInGlobalWindow
Posted by am...@apache.org.
Replace valueInEmptyWindows with valueInGlobalWindow
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5fab1c5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5fab1c5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5fab1c5f
Branch: refs/heads/master
Commit: 5fab1c5f828395024ff4add85a174ffdfbfce916
Parents: f20bf8a
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 22:01:15 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:06:15 2016 +0300
----------------------------------------------------------------------
.../org/apache/beam/runners/spark/translation/DoFnFunction.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5fab1c5f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 674da73..fbc9e98 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -75,7 +75,7 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
@Override
public synchronized void output(O o) {
outputs.add(windowedValue != null ? windowedValue.withValue(o) :
- WindowedValue.valueInEmptyWindows(o));
+ WindowedValue.valueInGlobalWindow(o));
}
@Override
[5/5] incubator-beam git commit: [BEAM-189] The Spark runner uses
valueInEmptyWindow which causes values to be dropped
Posted by am...@apache.org.
[BEAM-189] The Spark runner uses valueInEmptyWindow which causes values to be dropped
This closes #179
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/135cb733
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/135cb733
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/135cb733
Branch: refs/heads/master
Commit: 135cb733f94586b9dfc0c80d50979c1bfc91ba97
Parents: f20bf8a 932e5b4
Author: Sela <an...@paypal.com>
Authored: Tue Apr 19 22:31:07 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:31:07 2016 +0300
----------------------------------------------------------------------
.../runners/spark/translation/DoFnFunction.java | 2 +-
.../spark/translation/EvaluationContext.java | 59 ++++++++++----------
.../spark/translation/WindowingHelpers.java | 38 +++++++++++--
.../beam/runners/spark/SimpleWordCountTest.java | 35 +++++++++++-
4 files changed, 97 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
[4/5] incubator-beam git commit: Add unit test for TextIO output to
support the mvn exec:exec example we provide in README
Posted by am...@apache.org.
Add unit test for TextIO output to support the mvn exec:exec example we provide in README
Satisfy checkstyle
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/932e5b49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/932e5b49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/932e5b49
Branch: refs/heads/master
Commit: 932e5b4976e2c5bf388768f7d3cdeb4da4e84e71
Parents: 1ca3b30
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 23:18:24 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:23:17 2016 +0300
----------------------------------------------------------------------
.../spark/translation/EvaluationContext.java | 3 +-
.../beam/runners/spark/SimpleWordCountTest.java | 35 ++++++++++++++++++--
2 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/932e5b49/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 531a6ce..6d49bd3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -262,7 +262,8 @@ public class EvaluationContext implements EvaluationResult {
@Override
public <T> Iterable<T> get(PCollection<T> pcollection) {
- @SuppressWarnings("unchecked") RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
+ @SuppressWarnings("unchecked")
+ RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
Iterable<WindowedValue<T>> windowedValues = rddHolder.getValues(pcollection);
return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/932e5b49/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index faa4dbf..4e9c0b8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.spark;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
@@ -31,11 +32,18 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.io.FileUtils;
+import org.junit.rules.TemporaryFolder;
+import org.junit.Rule;
+import org.junit.Test;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
-import org.junit.Test;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
@@ -50,7 +58,7 @@ public class SimpleWordCountTest {
ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
@Test
- public void testRun() throws Exception {
+ public void testInMem() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
@@ -64,6 +72,29 @@ public class SimpleWordCountTest {
res.close();
}
+ @Rule
+ public TemporaryFolder testFolder = new TemporaryFolder();
+
+ @Test
+ public void testOutputFile() throws Exception {
+ SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ options.setRunner(SparkPipelineRunner.class);
+ Pipeline p = Pipeline.create(options);
+ PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder
+ .of());
+ PCollection<String> output = inputWords.apply(new CountWords());
+
+ File outputFile = testFolder.newFile();
+ output.apply(
+ TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding());
+
+ EvaluationResult res = SparkPipelineRunner.create().run(p);
+ res.close();
+
+ assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
+ containsInAnyOrder(EXPECTED_COUNT_SET.toArray()));
+ }
+
/**
* A DoFn that tokenizes lines of text into individual words.
*/