You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/04/19 21:40:28 UTC

[1/5] incubator-beam git commit: Materialize PCollection/RDD as windowed values with the appropriate windows.

Repository: incubator-beam
Updated Branches:
  refs/heads/master f20bf8afd -> 135cb733f


Materialize PCollection/RDD as windowed values with the appropriate windows.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1ca3b30d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1ca3b30d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1ca3b30d

Branch: refs/heads/master
Commit: 1ca3b30dd5679e75ce9e35dc08cc0012fb899186
Parents: d852c5b
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 22:05:14 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:06:15 2016 +0300

----------------------------------------------------------------------
 .../spark/translation/EvaluationContext.java    | 62 ++++++++++----------
 1 file changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ca3b30d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 78a62aa..531a6ce 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -37,6 +37,9 @@ import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -76,12 +79,13 @@ public class EvaluationContext implements EvaluationResult {
    */
   private class RDDHolder<T> {
 
-    private Iterable<T> values;
+    private Iterable<WindowedValue<T>> windowedValues;
     private Coder<T> coder;
     private JavaRDDLike<WindowedValue<T>, ?> rdd;
 
     RDDHolder(Iterable<T> values, Coder<T> coder) {
-      this.values = values;
+      this.windowedValues =
+          Iterables.transform(values, WindowingHelpers.<T>windowValueFunction());
       this.coder = coder;
     }
 
@@ -91,14 +95,6 @@ public class EvaluationContext implements EvaluationResult {
 
     JavaRDDLike<WindowedValue<T>, ?> getRDD() {
       if (rdd == null) {
-        Iterable<WindowedValue<T>> windowedValues = Iterables.transform(values,
-            new Function<T, WindowedValue<T>>() {
-            @Override
-            public WindowedValue<T> apply(T t) {
-             // TODO: this is wrong if T is a TimestampedValue
-              return WindowedValue.valueInEmptyWindows(t);
-            }
-        });
         WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
             WindowedValue.getValueOnlyCoder(coder);
         rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
@@ -107,29 +103,31 @@ public class EvaluationContext implements EvaluationResult {
       return rdd;
     }
 
-    Iterable<T> getValues(PCollection<T> pcollection) {
-      if (values == null) {
-        coder = pcollection.getCoder();
-        JavaRDDLike<byte[], ?> bytesRDD = rdd.map(WindowingHelpers.<T>unwindowFunction())
-            .map(CoderHelpers.toByteFunction(coder));
+    Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) {
+      if (windowedValues == null) {
+        WindowFn<?, ?> windowFn =
+                pcollection.getWindowingStrategy().getWindowFn();
+        Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder();
+        final WindowedValue.WindowedValueCoder<T> windowedValueCoder;
+            if (windowFn instanceof GlobalWindows) {
+              windowedValueCoder =
+                  WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder());
+            } else {
+              windowedValueCoder =
+                  WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder);
+            }
+        JavaRDDLike<byte[], ?> bytesRDD =
+            rdd.map(CoderHelpers.toByteFunction(windowedValueCoder));
         List<byte[]> clientBytes = bytesRDD.collect();
-        values = Iterables.transform(clientBytes, new Function<byte[], T>() {
+        windowedValues = Iterables.transform(clientBytes,
+            new Function<byte[], WindowedValue<T>>() {
           @Override
-          public T apply(byte[] bytes) {
-            return CoderHelpers.fromByteArray(bytes, coder);
+          public WindowedValue<T> apply(byte[] bytes) {
+            return CoderHelpers.fromByteArray(bytes, windowedValueCoder);
           }
         });
       }
-      return values;
-    }
-
-    Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
-      return Iterables.transform(get(pcollection), new Function<T, WindowedValue<T>>() {
-        @Override
-        public WindowedValue<T> apply(T t) {
-          return WindowedValue.valueInEmptyWindows(t); // TODO: not the right place?
-        }
-      });
+      return windowedValues;
     }
   }
 
@@ -264,15 +262,15 @@ public class EvaluationContext implements EvaluationResult {
 
   @Override
   public <T> Iterable<T> get(PCollection<T> pcollection) {
-    @SuppressWarnings("unchecked")
-    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
-    return rddHolder.getValues(pcollection);
+    @SuppressWarnings("unchecked") RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
+    Iterable<WindowedValue<T>> windowedValues = rddHolder.getValues(pcollection);
+    return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
   }
 
   <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
     @SuppressWarnings("unchecked")
     RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
-    return rddHolder.getWindowedValues(pcollection);
+    return rddHolder.getValues(pcollection);
   }
 
   @Override


[2/5] incubator-beam git commit: Replace valueInEmptyWindows with valueInGlobalWindow in Spark Function, and add per-value (non-RDD) windowing functions

Posted by am...@apache.org.
Replace valueInEmptyWindows with valueInGlobalWindow in Spark Function, and add per-value (non-RDD)
windowing functions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d852c5b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d852c5b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d852c5b9

Branch: refs/heads/master
Commit: d852c5b9391a7dc6d9eea21f8a4e0905d2cd7b28
Parents: 5fab1c5
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 22:02:20 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:06:15 2016 +0300

----------------------------------------------------------------------
 .../spark/translation/WindowingHelpers.java     | 38 +++++++++++++++++---
 1 file changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d852c5b9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
index e92b6d1..ec94f3e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java
@@ -29,8 +29,8 @@ public final class WindowingHelpers {
   }
 
   /**
-   * A function for converting a value to a {@link WindowedValue}. The resulting
-   * {@link WindowedValue} will be in no windows, and will have the default timestamp
+   * A Spark function for converting a value to a {@link WindowedValue}. The resulting
+   * {@link WindowedValue} will be in a global windows, and will have the default timestamp == MIN
    * and pane.
    *
    * @param <T>   The type of the object.
@@ -40,13 +40,13 @@ public final class WindowingHelpers {
     return new Function<T, WindowedValue<T>>() {
       @Override
       public WindowedValue<T> call(T t) {
-        return WindowedValue.valueInEmptyWindows(t);
+        return WindowedValue.valueInGlobalWindow(t);
       }
     };
   }
 
   /**
-   * A function for extracting the value from a {@link WindowedValue}.
+   * A Spark function for extracting the value from a {@link WindowedValue}.
    *
    * @param <T>   The type of the object.
    * @return A function that accepts a {@link WindowedValue} and returns its value.
@@ -59,4 +59,34 @@ public final class WindowingHelpers {
       }
     };
   }
+
+  /**
+   * Same as windowFunction but for non-RDD values - not an RDD transformation!
+   *
+   * @param <T>   The type of the object.
+   * @return A function that accepts an object and returns its {@link WindowedValue}.
+   */
+  public static <T> com.google.common.base.Function<T, WindowedValue<T>> windowValueFunction() {
+    return new com.google.common.base.Function<T, WindowedValue<T>>() {
+      @Override
+      public WindowedValue<T> apply(T t) {
+        return WindowedValue.valueInGlobalWindow(t);
+      }
+    };
+  }
+
+  /**
+   * Same as unwindowFunction but for non-RDD values - not an RDD transformation!
+   *
+   * @param <T>   The type of the object.
+   * @return A function that accepts an object and returns its {@link WindowedValue}.
+   */
+  public static <T> com.google.common.base.Function<WindowedValue<T>, T> unwindowValueFunction() {
+    return new com.google.common.base.Function<WindowedValue<T>, T>() {
+      @Override
+      public T apply(WindowedValue<T> t) {
+        return t.getValue();
+      }
+    };
+  }
 }


[3/5] incubator-beam git commit: Replace valueInEmptyWindows with valueInGlobalWindow

Posted by am...@apache.org.
Replace valueInEmptyWindows with valueInGlobalWindow


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5fab1c5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5fab1c5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5fab1c5f

Branch: refs/heads/master
Commit: 5fab1c5f828395024ff4add85a174ffdfbfce916
Parents: f20bf8a
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 22:01:15 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:06:15 2016 +0300

----------------------------------------------------------------------
 .../org/apache/beam/runners/spark/translation/DoFnFunction.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5fab1c5f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 674da73..fbc9e98 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -75,7 +75,7 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
     @Override
     public synchronized void output(O o) {
       outputs.add(windowedValue != null ? windowedValue.withValue(o) :
-          WindowedValue.valueInEmptyWindows(o));
+          WindowedValue.valueInGlobalWindow(o));
     }
 
     @Override


[5/5] incubator-beam git commit: [BEAM-189] The Spark runner uses valueInEmptyWindow which causes values to be dropped

Posted by am...@apache.org.
[BEAM-189] The Spark runner uses valueInEmptyWindow which causes values to be dropped

This closes #179


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/135cb733
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/135cb733
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/135cb733

Branch: refs/heads/master
Commit: 135cb733f94586b9dfc0c80d50979c1bfc91ba97
Parents: f20bf8a 932e5b4
Author: Sela <an...@paypal.com>
Authored: Tue Apr 19 22:31:07 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:31:07 2016 +0300

----------------------------------------------------------------------
 .../runners/spark/translation/DoFnFunction.java |  2 +-
 .../spark/translation/EvaluationContext.java    | 59 ++++++++++----------
 .../spark/translation/WindowingHelpers.java     | 38 +++++++++++--
 .../beam/runners/spark/SimpleWordCountTest.java | 35 +++++++++++-
 4 files changed, 97 insertions(+), 37 deletions(-)
----------------------------------------------------------------------



[4/5] incubator-beam git commit: Add unit test for TextIO output to support the mvn exec:exec example we provide in README

Posted by am...@apache.org.
Add unit test for TextIO output to support the mvn exec:exec example we provide in README

Satisfy checkstyle


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/932e5b49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/932e5b49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/932e5b49

Branch: refs/heads/master
Commit: 932e5b4976e2c5bf388768f7d3cdeb4da4e84e71
Parents: 1ca3b30
Author: Sela <an...@paypal.com>
Authored: Thu Apr 14 23:18:24 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Tue Apr 19 22:23:17 2016 +0300

----------------------------------------------------------------------
 .../spark/translation/EvaluationContext.java    |  3 +-
 .../beam/runners/spark/SimpleWordCountTest.java | 35 ++++++++++++++++++--
 2 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/932e5b49/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 531a6ce..6d49bd3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -262,7 +262,8 @@ public class EvaluationContext implements EvaluationResult {
 
   @Override
   public <T> Iterable<T> get(PCollection<T> pcollection) {
-    @SuppressWarnings("unchecked") RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
+    @SuppressWarnings("unchecked")
+    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
     Iterable<WindowedValue<T>> windowedValues = rddHolder.getValues(pcollection);
     return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/932e5b49/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index faa4dbf..4e9c0b8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.spark;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
@@ -31,11 +32,18 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.io.FileUtils;
+import org.junit.rules.TemporaryFolder;
+import org.junit.Rule;
+import org.junit.Test;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
-import org.junit.Test;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
@@ -50,7 +58,7 @@ public class SimpleWordCountTest {
       ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
 
   @Test
-  public void testRun() throws Exception {
+  public void testInMem() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkPipelineRunner.class);
     Pipeline p = Pipeline.create(options);
@@ -64,6 +72,29 @@ public class SimpleWordCountTest {
     res.close();
   }
 
+  @Rule
+  public TemporaryFolder testFolder = new TemporaryFolder();
+
+  @Test
+  public void testOutputFile() throws Exception {
+    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    options.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
+    PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder
+        .of());
+    PCollection<String> output = inputWords.apply(new CountWords());
+
+    File outputFile = testFolder.newFile();
+    output.apply(
+        TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding());
+
+    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    res.close();
+
+    assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
+        containsInAnyOrder(EXPECTED_COUNT_SET.toArray()));
+  }
+
   /**
    * A DoFn that tokenizes lines of text into individual words.
    */