You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/03/26 08:24:11 UTC
[1/2] beam git commit: [BEAM-1810] Replace usage of RDD#isEmpty on
non-serialized RDDs
Repository: beam
Updated Branches:
refs/heads/master 348d33588 -> c9e55a436
[BEAM-1810] Replace usage of RDD#isEmpty on non-serialized RDDs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b32f0482
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b32f0482
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b32f0482
Branch: refs/heads/master
Commit: b32f0482784b9df7ce67226b32febe6e664a45b6
Parents: 348d335
Author: Aviem Zur <av...@gmail.com>
Authored: Sat Mar 25 21:49:06 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sun Mar 26 10:31:40 2017 +0300
----------------------------------------------------------------------
.../translation/GroupCombineFunctions.java | 15 ++++++-----
.../spark/translation/TransformTranslator.java | 26 ++++++++++++--------
2 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index b2a589d..917a9ee 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -18,8 +18,7 @@
package org.apache.beam.runners.spark.translation;
-import static com.google.common.base.Preconditions.checkArgument;
-
+import com.google.common.base.Optional;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.coders.Coder;
@@ -67,14 +66,12 @@ public class GroupCombineFunctions {
/**
* Apply a composite {@link org.apache.beam.sdk.transforms.Combine.Globally} transformation.
*/
- public static <InputT, AccumT> Iterable<WindowedValue<AccumT>> combineGlobally(
+ public static <InputT, AccumT> Optional<Iterable<WindowedValue<AccumT>>> combineGlobally(
JavaRDD<WindowedValue<InputT>> rdd,
final SparkGlobalCombineFn<InputT, AccumT, ?> sparkCombineFn,
final Coder<InputT> iCoder,
final Coder<AccumT> aCoder,
final WindowingStrategy<?, ?> windowingStrategy) {
- checkArgument(!rdd.isEmpty(), "CombineGlobally computation should be skipped for empty RDDs.");
-
// coders.
final WindowedValue.FullWindowedValueCoder<InputT> wviCoder =
WindowedValue.FullWindowedValueCoder.of(iCoder,
@@ -93,6 +90,11 @@ public class GroupCombineFunctions {
//---- AccumT: A
//---- InputT: I
JavaRDD<byte[]> inputRDDBytes = rdd.map(CoderHelpers.toByteFunction(wviCoder));
+
+ if (inputRDDBytes.isEmpty()) {
+ return Optional.absent();
+ }
+
/*Itr<WV<A>>*/ byte[] accumulatedBytes = inputRDDBytes.aggregate(
CoderHelpers.toByteArray(sparkCombineFn.zeroValue(), iterAccumCoder),
new Function2</*A*/ byte[], /*I*/ byte[], /*A*/ byte[]>() {
@@ -115,7 +117,8 @@ public class GroupCombineFunctions {
}
}
);
- return CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder);
+
+ return Optional.of(CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/b32f0482/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index b4362b0..ffb207a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -27,6 +27,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceSh
import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable;
import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
+import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -259,9 +260,20 @@ public final class TransformTranslator {
((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
JavaRDD<WindowedValue<OutputT>> outRdd;
- // handle empty input RDD, which will naturally skip the entire execution
- // as Spark will not run on empty RDDs.
- if (inRdd.isEmpty()) {
+
+ Optional<Iterable<WindowedValue<AccumT>>> maybeAccumulated =
+ GroupCombineFunctions.combineGlobally(inRdd, sparkCombineFn, iCoder, aCoder,
+ windowingStrategy);
+
+ if (maybeAccumulated.isPresent()) {
+ Iterable<WindowedValue<OutputT>> output =
+ sparkCombineFn.extractOutput(maybeAccumulated.get());
+ outRdd = context.getSparkContext()
+ .parallelize(CoderHelpers.toByteArrays(output, wvoCoder))
+ .map(CoderHelpers.fromByteFunction(wvoCoder));
+ } else {
+ // handle empty input RDD, which will naturally skip the entire execution
+ // as Spark will not run on empty RDDs.
JavaSparkContext jsc = new JavaSparkContext(inRdd.context());
if (hasDefault) {
OutputT defaultValue = combineFn.defaultValue();
@@ -272,14 +284,8 @@ public final class TransformTranslator {
} else {
outRdd = jsc.emptyRDD();
}
- } else {
- Iterable<WindowedValue<AccumT>> accumulated = GroupCombineFunctions.combineGlobally(
- inRdd, sparkCombineFn, iCoder, aCoder, windowingStrategy);
- Iterable<WindowedValue<OutputT>> output = sparkCombineFn.extractOutput(accumulated);
- outRdd = context.getSparkContext()
- .parallelize(CoderHelpers.toByteArrays(output, wvoCoder))
- .map(CoderHelpers.fromByteFunction(wvoCoder));
}
+
context.putDataset(transform, new BoundedDataset<>(outRdd));
}
[2/2] beam git commit: This closes #2328
Posted by av...@apache.org.
This closes #2328
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9e55a43
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9e55a43
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9e55a43
Branch: refs/heads/master
Commit: c9e55a4360a9fe06d6ed943a222bce524a6b10af
Parents: 348d335 b32f048
Author: Aviem Zur <av...@gmail.com>
Authored: Sun Mar 26 11:23:45 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sun Mar 26 11:23:45 2017 +0300
----------------------------------------------------------------------
.../translation/GroupCombineFunctions.java | 15 ++++++-----
.../spark/translation/TransformTranslator.java | 26 ++++++++++++--------
2 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------