You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/03/26 08:24:11 UTC

[1/2] beam git commit: [BEAM-1810] Replace usage of RDD#isEmpty on non-serialized RDDs

Repository: beam
Updated Branches:
  refs/heads/master 348d33588 -> c9e55a436


[BEAM-1810] Replace usage of RDD#isEmpty on non-serialized RDDs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b32f0482
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b32f0482
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b32f0482

Branch: refs/heads/master
Commit: b32f0482784b9df7ce67226b32febe6e664a45b6
Parents: 348d335
Author: Aviem Zur <av...@gmail.com>
Authored: Sat Mar 25 21:49:06 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sun Mar 26 10:31:40 2017 +0300

----------------------------------------------------------------------
 .../translation/GroupCombineFunctions.java      | 15 ++++++-----
 .../spark/translation/TransformTranslator.java  | 26 ++++++++++++--------
 2 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index b2a589d..917a9ee 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -18,8 +18,7 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
+import com.google.common.base.Optional;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.sdk.coders.Coder;
@@ -67,14 +66,12 @@ public class GroupCombineFunctions {
   /**
    * Apply a composite {@link org.apache.beam.sdk.transforms.Combine.Globally} transformation.
    */
-  public static <InputT, AccumT> Iterable<WindowedValue<AccumT>> combineGlobally(
+  public static <InputT, AccumT> Optional<Iterable<WindowedValue<AccumT>>> combineGlobally(
       JavaRDD<WindowedValue<InputT>> rdd,
       final SparkGlobalCombineFn<InputT, AccumT, ?> sparkCombineFn,
       final Coder<InputT> iCoder,
       final Coder<AccumT> aCoder,
       final WindowingStrategy<?, ?> windowingStrategy) {
-    checkArgument(!rdd.isEmpty(), "CombineGlobally computation should be skipped for empty RDDs.");
-
     // coders.
     final WindowedValue.FullWindowedValueCoder<InputT> wviCoder =
         WindowedValue.FullWindowedValueCoder.of(iCoder,
@@ -93,6 +90,11 @@ public class GroupCombineFunctions {
     //---- AccumT: A
     //---- InputT: I
     JavaRDD<byte[]> inputRDDBytes = rdd.map(CoderHelpers.toByteFunction(wviCoder));
+
+    if (inputRDDBytes.isEmpty()) {
+      return Optional.absent();
+    }
+
     /*Itr<WV<A>>*/ byte[] accumulatedBytes = inputRDDBytes.aggregate(
         CoderHelpers.toByteArray(sparkCombineFn.zeroValue(), iterAccumCoder),
         new Function2</*A*/ byte[], /*I*/ byte[], /*A*/ byte[]>() {
@@ -115,7 +117,8 @@ public class GroupCombineFunctions {
           }
         }
     );
-    return CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder);
+
+    return Optional.of(CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index b4362b0..ffb207a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -27,6 +27,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceSh
 import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable;
 import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -259,9 +260,20 @@ public final class TransformTranslator {
                 ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
 
             JavaRDD<WindowedValue<OutputT>> outRdd;
-            // handle empty input RDD, which will naturally skip the entire execution
-            // as Spark will not run on empty RDDs.
-            if (inRdd.isEmpty()) {
+
+            Optional<Iterable<WindowedValue<AccumT>>> maybeAccumulated =
+                GroupCombineFunctions.combineGlobally(inRdd, sparkCombineFn, iCoder, aCoder,
+                    windowingStrategy);
+
+            if (maybeAccumulated.isPresent()) {
+              Iterable<WindowedValue<OutputT>> output =
+                  sparkCombineFn.extractOutput(maybeAccumulated.get());
+              outRdd = context.getSparkContext()
+                  .parallelize(CoderHelpers.toByteArrays(output, wvoCoder))
+                  .map(CoderHelpers.fromByteFunction(wvoCoder));
+            } else {
+              // handle empty input RDD, which will naturally skip the entire execution
+              // as Spark will not run on empty RDDs.
               JavaSparkContext jsc = new JavaSparkContext(inRdd.context());
               if (hasDefault) {
                 OutputT defaultValue = combineFn.defaultValue();
@@ -272,14 +284,8 @@ public final class TransformTranslator {
               } else {
                 outRdd = jsc.emptyRDD();
               }
-            } else {
-              Iterable<WindowedValue<AccumT>> accumulated = GroupCombineFunctions.combineGlobally(
-                  inRdd, sparkCombineFn, iCoder, aCoder, windowingStrategy);
-              Iterable<WindowedValue<OutputT>> output = sparkCombineFn.extractOutput(accumulated);
-              outRdd = context.getSparkContext()
-                  .parallelize(CoderHelpers.toByteArrays(output, wvoCoder))
-                  .map(CoderHelpers.fromByteFunction(wvoCoder));
             }
+
             context.putDataset(transform, new BoundedDataset<>(outRdd));
           }
 


[2/2] beam git commit: This closes #2328

Posted by av...@apache.org.
This closes #2328


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9e55a43
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9e55a43
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9e55a43

Branch: refs/heads/master
Commit: c9e55a4360a9fe06d6ed943a222bce524a6b10af
Parents: 348d335 b32f048
Author: Aviem Zur <av...@gmail.com>
Authored: Sun Mar 26 11:23:45 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sun Mar 26 11:23:45 2017 +0300

----------------------------------------------------------------------
 .../translation/GroupCombineFunctions.java      | 15 ++++++-----
 .../spark/translation/TransformTranslator.java  | 26 ++++++++++++--------
 2 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------