You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/03/06 09:49:12 UTC
[1/2] beam git commit: [BEAM-1623] Transform Reshuffle directly in
Spark runner
Repository: beam
Updated Branches:
refs/heads/master 69d951225 -> 34b38ef95
[BEAM-1623] Transform Reshuffle directly in Spark runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8bc618e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8bc618e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8bc618e
Branch: refs/heads/master
Commit: d8bc618edafd07ae8e0ec692fc7f3df7395b876e
Parents: 69d9512
Author: Aviem Zur <av...@gmail.com>
Authored: Sun Mar 5 07:15:32 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Mon Mar 6 11:19:22 2017 +0200
----------------------------------------------------------------------
.../translation/GroupCombineFunctions.java | 22 ++++++++++++
.../spark/translation/TransformTranslator.java | 38 +++++++++++++++-----
.../spark/translation/TranslationUtils.java | 10 ++++++
.../streaming/StreamingTransformTranslator.java | 36 +++++++++++++++++++
4 files changed, 97 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 1e879ce..b2a589d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -203,4 +203,26 @@ public class GroupCombineFunctions {
return accumulatedBytes.mapToPair(CoderHelpers.fromByteFunction(keyCoder, iterAccumCoder));
}
+
+ /**
+ * An implementation of
+ * {@link org.apache.beam.sdk.util.Reshuffle} for the Spark runner.
+ */
+ public static <K, V> JavaRDD<WindowedValue<KV<K, V>>> reshuffle(
+ JavaRDD<WindowedValue<KV<K, V>>> rdd,
+ Coder<K> keyCoder,
+ WindowedValueCoder<V> wvCoder) {
+
+ // Use coders to convert objects in the PCollection to byte arrays, so they
+ // can be transferred over the network for the shuffle.
+ return rdd
+ .map(new ReifyTimestampsAndWindowsFunction<K, V>())
+ .map(WindowingHelpers.<KV<K, WindowedValue<V>>>unwindowFunction())
+ .mapToPair(TranslationUtils.<K, WindowedValue<V>>toPairFunction())
+ .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder))
+ .repartition(rdd.getNumPartitions())
+ .mapToPair(CoderHelpers.fromByteFunction(keyCoder, wvCoder))
+ .map(TranslationUtils.<K, WindowedValue<V>>fromPairFunction())
+ .map(TranslationUtils.<K, V>toKVByWindowInValue());
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index a4939b9..0ae7313 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CombineFnUtil;
+import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -318,15 +319,7 @@ public final class TransformTranslator {
return sparkCombineFn.extractOutput(iter);
}
}).map(TranslationUtils.<K, WindowedValue<OutputT>>fromPairFunction())
- .map(new Function<KV<K, WindowedValue<OutputT>>,
- WindowedValue<KV<K, OutputT>>>() {
- @Override
- public WindowedValue<KV<K, OutputT>> call(
- KV<K, WindowedValue<OutputT>> kv) throws Exception {
- WindowedValue<OutputT> wv = kv.getValue();
- return wv.withValue(KV.of(kv.getKey(), wv.getValue()));
- }
- });
+ .map(TranslationUtils.<K, OutputT>toKVByWindowInValue());
context.putDataset(transform, new BoundedDataset<>(outRdd));
}
@@ -735,6 +728,32 @@ public final class TransformTranslator {
};
}
+ private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K, V>> reshuffle() {
+ return new TransformEvaluator<Reshuffle<K, V>>() {
+ @Override public void evaluate(Reshuffle<K, V> transform, EvaluationContext context) {
+ @SuppressWarnings("unchecked")
+ JavaRDD<WindowedValue<KV<K, V>>> inRDD =
+ ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)).getRDD();
+ @SuppressWarnings("unchecked")
+ final WindowingStrategy<?, W> windowingStrategy =
+ (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy();
+ @SuppressWarnings("unchecked")
+ final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
+ @SuppressWarnings("unchecked")
+ final WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();
+
+ final Coder<K> keyCoder = coder.getKeyCoder();
+ final WindowedValue.WindowedValueCoder<V> wvCoder =
+ WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());
+
+ JavaRDD<WindowedValue<KV<K, V>>> reshuffled =
+ GroupCombineFunctions.reshuffle(inRDD, keyCoder, wvCoder);
+
+ context.putDataset(transform, new BoundedDataset<>(reshuffled));
+ }
+ };
+ }
+
private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
.newHashMap();
@@ -753,6 +772,7 @@ public final class TransformTranslator {
EVALUATORS.put(View.AsIterable.class, viewAsIter());
EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
EVALUATORS.put(Window.Assign.class, window());
+ EVALUATORS.put(Reshuffle.class, reshuffle());
// mostly test evaluators
EVALUATORS.put(StorageLevelPTransform.class, storageLevel());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 158593e..f2b3418 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -159,6 +159,16 @@ public final class TranslationUtils {
};
}
+ /** Extract window from a {@link KV} with {@link WindowedValue} value. */
+ static <K, V> Function<KV<K, WindowedValue<V>>, WindowedValue<KV<K, V>>> toKVByWindowInValue() {
+ return new Function<KV<K, WindowedValue<V>>, WindowedValue<KV<K, V>>>() {
+ @Override public WindowedValue<KV<K, V>> call(KV<K, WindowedValue<V>> kv) throws Exception {
+ WindowedValue<V> wv = kv.getValue();
+ return wv.withValue(KV.of(kv.getKey(), wv.getValue()));
+ }
+ };
+ }
+
/**
* A utility class to filter {@link TupleTag}s.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 628b713..31307cc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -73,6 +73,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CombineFnUtil;
+import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -445,6 +446,40 @@ final class StreamingTransformTranslator {
};
}
+ private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K, V>> reshuffle() {
+ return new TransformEvaluator<Reshuffle<K, V>>() {
+ @Override
+ public void evaluate(Reshuffle<K, V> transform, EvaluationContext context) {
+ @SuppressWarnings("unchecked") UnboundedDataset<KV<K, V>> inputDataset =
+ (UnboundedDataset<KV<K, V>>) context.borrowDataset(transform);
+ List<Integer> streamSources = inputDataset.getStreamSources();
+ JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+ @SuppressWarnings("unchecked")
+ final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
+ @SuppressWarnings("unchecked")
+ final WindowingStrategy<?, W> windowingStrategy =
+ (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy();
+ @SuppressWarnings("unchecked")
+ final WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();
+
+ final WindowedValue.WindowedValueCoder<V> wvCoder =
+ WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());
+
+ JavaDStream<WindowedValue<KV<K, V>>> reshuffledStream =
+ dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>,
+ JavaRDD<WindowedValue<KV<K, V>>>>() {
+ @Override
+ public JavaRDD<WindowedValue<KV<K, V>>> call(
+ JavaRDD<WindowedValue<KV<K, V>>> rdd) throws Exception {
+ return GroupCombineFunctions.reshuffle(rdd, coder.getKeyCoder(), wvCoder);
+ }
+ });
+
+ context.putDataset(transform, new UnboundedDataset<>(reshuffledStream, streamSources));
+ }
+ };
+ }
+
private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS =
Maps.newHashMap();
@@ -457,6 +492,7 @@ final class StreamingTransformTranslator {
EVALUATORS.put(CreateStream.class, createFromQueue());
EVALUATORS.put(Window.Assign.class, window());
EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
+ EVALUATORS.put(Reshuffle.class, reshuffle());
}
/**
[2/2] beam git commit: This closes #2160
Posted by am...@apache.org.
This closes #2160
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34b38ef9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34b38ef9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34b38ef9
Branch: refs/heads/master
Commit: 34b38ef952b1981e82c0c74e0ee22b3d570169d4
Parents: 69d9512 d8bc618
Author: Sela <an...@paypal.com>
Authored: Mon Mar 6 11:19:48 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Mon Mar 6 11:19:48 2017 +0200
----------------------------------------------------------------------
.../translation/GroupCombineFunctions.java | 22 ++++++++++++
.../spark/translation/TransformTranslator.java | 38 +++++++++++++++-----
.../spark/translation/TranslationUtils.java | 10 ++++++
.../streaming/StreamingTransformTranslator.java | 36 +++++++++++++++++++
4 files changed, 97 insertions(+), 9 deletions(-)
----------------------------------------------------------------------