You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/03/07 14:27:22 UTC

[1/2] beam git commit: [BEAM-1636] UnboundedDataset action() does not materialize RDD

Repository: beam
Updated Branches:
  refs/heads/master 1fd52f53c -> c79bd95bd


[BEAM-1636] UnboundedDataset action() does not materialize RDD


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a889597e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a889597e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a889597e

Branch: refs/heads/master
Commit: a889597e748eb752141af8dc568c56449c4eba5c
Parents: 1fd52f5
Author: Aviem Zur <av...@gmail.com>
Authored: Tue Mar 7 15:07:03 2017 +0200
Committer: Aviem Zur <av...@gmail.com>
Committed: Tue Mar 7 15:07:03 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/translation/BoundedDataset.java   |  7 +------
 .../beam/runners/spark/translation/TranslationUtils.java |  8 ++++++++
 .../spark/translation/streaming/UnboundedDataset.java    | 11 ++++++++++-
 3 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a889597e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 7db04a8..6e4ffc7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.storage.StorageLevel;
 
 /**
@@ -106,11 +105,7 @@ public class BoundedDataset<T> implements Dataset {
   @Override
   public void action() {
     // Empty function to force computation of RDD.
-    rdd.foreach(new VoidFunction<WindowedValue<T>>() {
-      @Override public void call(WindowedValue<T> tWindowedValue) throws Exception {
-        // Empty implementation.
-      }
-    });
+    rdd.foreach(TranslationUtils.<WindowedValue<T>>emptyVoidFunction());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a889597e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index f2b3418..8545b36 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 
@@ -267,4 +268,11 @@ public final class TranslationUtils {
     }
   }
 
+  public static <T> VoidFunction<T> emptyVoidFunction() {
+    return new VoidFunction<T>() {
+      @Override public void call(T t) throws Exception {
+        // Empty implementation.
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a889597e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
index e9abe93..ccdaf11 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -21,7 +21,10 @@ package org.apache.beam.runners.spark.translation.streaming;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.spark.translation.Dataset;
+import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,11 +71,17 @@ public class UnboundedDataset<T> implements Dataset {
   @Override
   public void action() {
     // Force computation of DStream.
-    dStream.dstream().register();
+    dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
+      @Override
+      public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
+        rdd.foreach(TranslationUtils.<WindowedValue<T>>emptyVoidFunction());
+      }
+    });
   }
 
   @Override
   public void setName(String name) {
     // ignore
   }
+
 }


[2/2] beam git commit: This closes #2180

Posted by am...@apache.org.
This closes #2180


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c79bd95b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c79bd95b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c79bd95b

Branch: refs/heads/master
Commit: c79bd95bddab2329ef6117409621a3e00de84815
Parents: 1fd52f5 a889597
Author: Amit Sela <am...@gmail.com>
Authored: Tue Mar 7 15:13:54 2017 +0200
Committer: Amit Sela <am...@gmail.com>
Committed: Tue Mar 7 15:13:54 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/translation/BoundedDataset.java   |  7 +------
 .../beam/runners/spark/translation/TranslationUtils.java |  8 ++++++++
 .../spark/translation/streaming/UnboundedDataset.java    | 11 ++++++++++-
 3 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------