You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/10/26 07:34:09 UTC
[1/2] incubator-beam git commit: [BEAM-799] Support GroupByKey
directly.
Repository: incubator-beam
Updated Branches:
refs/heads/master f2fe1ae46 -> 53fe3ee42
[BEAM-799] Support GroupByKey directly.
Remove runner override for GroupByKey.
Avoid NPE if no sideInputs are available in reader.
Handle CombineFn with or without context.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a54ded37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a54ded37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a54ded37
Branch: refs/heads/master
Commit: a54ded373fa7f6508fb46eea1a1d6f9bc405114b
Parents: f2fe1ae
Author: Sela <an...@paypal.com>
Authored: Sat Oct 22 14:51:50 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 26 10:00:45 2016 +0300
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 19 ------
.../translation/GroupCombineFunctions.java | 66 +++++++++-----------
.../spark/translation/TransformTranslator.java | 43 +++----------
.../streaming/StreamingTransformTranslator.java | 65 +++++--------------
.../spark/util/SparkSideInputReader.java | 2 +-
5 files changed, 55 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index b17c38c..45c7f55 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.spark;
import java.util.Collection;
import java.util.List;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
@@ -36,7 +35,6 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PBegin;
@@ -115,23 +113,6 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
}
/**
- * Overrides for this runner.
- */
- @SuppressWarnings("rawtypes")
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
-
- if (transform instanceof GroupByKey) {
- return (OutputT) ((PCollection) input).apply(
- new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
- } else {
- return super.apply(transform, input);
- }
- }
-
-
- /**
* No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
* thread.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index de02b26..e2a0f87 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -20,11 +20,9 @@ package org.apache.beam.runners.spark.translation;
import com.google.common.collect.Lists;
-
import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -38,6 +36,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ReifyTimestampAndWindowsDoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -60,54 +59,45 @@ import scala.Tuple2;
public class GroupCombineFunctions {
/**
- * Apply {@link GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly} to a Spark RDD.
+ * Apply {@link org.apache.beam.sdk.transforms.GroupByKey} to a Spark RDD.
*/
- public static <K, V> JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupByKeyOnly(
- JavaRDD<WindowedValue<KV<K, V>>> rdd, KvCoder<K, V> coder) {
+ public static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K,
+ Iterable<V>>>> groupByKey(JavaRDD<WindowedValue<KV<K, V>>> rdd,
+ Accumulator<NamedAggregators> accum,
+ KvCoder<K, V> coder,
+ SparkRuntimeContext runtimeContext,
+ WindowingStrategy<?, W> windowingStrategy) {
+ //--- coders.
final Coder<K> keyCoder = coder.getKeyCoder();
final Coder<V> valueCoder = coder.getValueCoder();
+ final WindowedValue.WindowedValueCoder<V> wvCoder = WindowedValue.FullWindowedValueCoder.of(
+ valueCoder, windowingStrategy.getWindowFn().windowCoder());
+
+ //--- groupByKey.
// Use coders to convert objects in the PCollection to byte arrays, so they
// can be transferred over the network for the shuffle.
- return rdd.map(WindowingHelpers.<KV<K, V>>unwindowFunction())
- .mapToPair(TranslationUtils.<K, V>toPairFunction())
- .mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder))
- .groupByKey()
- .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder))
- // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK
- .map(TranslationUtils.<K, Iterable<V>>fromPairFunction())
- .map(WindowingHelpers.<KV<K, Iterable<V>>>windowFunction());
- }
-
- /**
- * Apply {@link GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow} to a Spark RDD.
- */
- public static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K, Iterable<V>>>>
- groupAlsoByWindow(JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> rdd,
- GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow<K, V> transform,
- SparkRuntimeContext runtimeContext,
- Accumulator<NamedAggregators> accum,
- KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder) {
- //--- coders.
- Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
- IterableCoder<WindowedValue<V>> inputIterableValueCoder =
- (IterableCoder<WindowedValue<V>>) inputValueCoder;
- Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
- WindowedValue.WindowedValueCoder<V> inputIterableWindowedValueCoder =
- (WindowedValue.WindowedValueCoder<V>) inputIterableElementCoder;
- Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+ JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey =
+ rdd.mapPartitions(new DoFnFunction<KV<K, V>, KV<K, WindowedValue<V>>>(null,
+ new ReifyTimestampAndWindowsDoFn<K, V>(), runtimeContext, null, null))
+ .map(WindowingHelpers.<KV<K, WindowedValue<V>>>unwindowFunction())
+ .mapToPair(TranslationUtils.<K, WindowedValue<V>>toPairFunction())
+ .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder))
+ .groupByKey()
+ .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, wvCoder))
+ // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK
+ .map(TranslationUtils.<K, Iterable<WindowedValue<V>>>fromPairFunction())
+ .map(WindowingHelpers.<KV<K, Iterable<WindowedValue<V>>>>windowFunction());
- @SuppressWarnings("unchecked")
- WindowingStrategy<?, W> windowingStrategy =
- (WindowingStrategy<?, W>) transform.getWindowingStrategy();
+ //--- now group also by window.
@SuppressWarnings("unchecked")
WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();
-
// GroupAlsoByWindow current uses a dummy in-memory StateInternals
OldDoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
windowingStrategy, new TranslationUtils.InMemoryStateInternalsFactory<K>(),
- SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
- return rdd.mapPartitions(new DoFnFunction<>(accum, gabwDoFn, runtimeContext, null, windowFn));
+ SystemReduceFn.<K, V, W>buffering(valueCoder));
+ return groupedByKey.mapPartitions(new DoFnFunction<>(accum, gabwDoFn, runtimeContext, null,
+ windowFn));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index b55e3b2..2e682c4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -32,8 +32,6 @@ import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.io.SourceRDD;
@@ -51,6 +49,7 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -58,7 +57,6 @@ import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -112,10 +110,10 @@ public final class TransformTranslator {
};
}
- private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbko() {
- return new TransformEvaluator<GroupByKeyOnly<K, V>>() {
+ private static <K, V> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
+ return new TransformEvaluator<GroupByKey<K, V>>() {
@Override
- public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) {
+ public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<KV<K, V>>> inRDD =
(JavaRDD<WindowedValue<KV<K, V>>>) context.getInputRDD(transform);
@@ -123,30 +121,11 @@ public final class TransformTranslator {
@SuppressWarnings("unchecked")
final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
- context.setOutputRDD(transform, GroupCombineFunctions.groupByKeyOnly(inRDD, coder));
- }
- };
- }
-
- private static <K, V, W extends BoundedWindow>
- TransformEvaluator<GroupAlsoByWindow<K, V>> gabw() {
- return new TransformEvaluator<GroupAlsoByWindow<K, V>>() {
- @Override
- public void evaluate(GroupAlsoByWindow<K, V> transform, EvaluationContext context) {
- @SuppressWarnings("unchecked")
- JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> inRDD =
- (JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>)
- context.getInputRDD(transform);
-
- @SuppressWarnings("unchecked")
- final KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
- (KvCoder<K, Iterable<WindowedValue<V>>>) context.getInput(transform).getCoder();
-
final Accumulator<NamedAggregators> accum =
- AccumulatorSingleton.getInstance(context.getSparkContext());
+ AccumulatorSingleton.getInstance(context.getSparkContext());
- context.setOutputRDD(transform, GroupCombineFunctions.groupAlsoByWindow(inRDD, transform,
- context.getRuntimeContext(), accum, inputKvCoder));
+ context.setOutputRDD(transform, GroupCombineFunctions.groupByKey(inRDD, accum, coder,
+ context.getRuntimeContext(), context.getInput(transform).getWindowingStrategy()));
}
};
}
@@ -161,13 +140,10 @@ public final class TransformTranslator {
PCollection<? extends KV<K, ? extends Iterable<InputT>>> input =
context.getInput(transform);
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
- AppliedCombineFn<? super K, ? super InputT, ?, OutputT> appliedFn = transform
- .getAppliedFn(context.getPipeline().getCoderRegistry(), input.getCoder(),
- windowingStrategy);
@SuppressWarnings("unchecked")
CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn =
(CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>)
- CombineFnUtil.toFnWithContext(appliedFn.getFn());
+ CombineFnUtil.toFnWithContext(transform.getFn());
@SuppressWarnings("unchecked")
JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?> inRDD =
@@ -592,8 +568,7 @@ public final class TransformTranslator {
EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
EVALUATORS.put(ParDo.Bound.class, parDo());
EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
- EVALUATORS.put(GroupByKeyOnly.class, gbko());
- EVALUATORS.put(GroupAlsoByWindow.class, gabw());
+ EVALUATORS.put(GroupByKey.class, groupByKey());
EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
EVALUATORS.put(Combine.Globally.class, combineGlobally());
EVALUATORS.put(Combine.PerKey.class, combinePerKey());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 9f2d764..1af5e07 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -26,9 +26,6 @@ import java.util.Map;
import java.util.Set;
import kafka.serializer.Decoder;
import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.io.ConsoleIO;
@@ -50,6 +47,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -58,7 +56,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -84,6 +81,7 @@ import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
+
/**
* Supports translation between a Beam transform, and Spark's operations on DStreams.
*/
@@ -231,10 +229,10 @@ public final class StreamingTransformTranslator {
};
}
- private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbko() {
- return new TransformEvaluator<GroupByKeyOnly<K, V>>() {
+ private static <K, V> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
+ return new TransformEvaluator<GroupByKey<K, V>>() {
@Override
- public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) {
+ public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
@SuppressWarnings("unchecked")
@@ -244,13 +242,20 @@ public final class StreamingTransformTranslator {
@SuppressWarnings("unchecked")
final KvCoder<K, V> coder = (KvCoder<K, V>) sec.getInput(transform).getCoder();
+ final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+ final WindowingStrategy<?, ?> windowingStrategy =
+ sec.getInput(transform).getWindowingStrategy();
+
JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>,
JavaRDD<WindowedValue<KV<K, Iterable<V>>>>>() {
@Override
public JavaRDD<WindowedValue<KV<K, Iterable<V>>>> call(
JavaRDD<WindowedValue<KV<K, V>>> rdd) throws Exception {
- return GroupCombineFunctions.groupByKeyOnly(rdd, coder);
+ final Accumulator<NamedAggregators> accum =
+ AccumulatorSingleton.getInstance(new JavaSparkContext(rdd.context()));
+ return GroupCombineFunctions.groupByKey(rdd, accum, coder, runtimeContext,
+ windowingStrategy);
}
});
sec.setStream(transform, outStream);
@@ -258,39 +263,6 @@ public final class StreamingTransformTranslator {
};
}
- private static <K, V, W extends BoundedWindow>
- TransformEvaluator<GroupAlsoByWindow<K, V>> gabw() {
- return new TransformEvaluator<GroupAlsoByWindow<K, V>>() {
- @Override
- public void evaluate(final GroupAlsoByWindow<K, V> transform, EvaluationContext context) {
- final StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
- @SuppressWarnings("unchecked")
- JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> dStream =
- (JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>)
- sec.getStream(transform);
-
- @SuppressWarnings("unchecked")
- final KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
- (KvCoder<K, Iterable<WindowedValue<V>>>) sec.getInput(transform).getCoder();
-
- JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
- dStream.transform(new Function<JavaRDD<WindowedValue<KV<K,
- Iterable<WindowedValue<V>>>>>, JavaRDD<WindowedValue<KV<K, Iterable<V>>>>>() {
- @Override
- public JavaRDD<WindowedValue<KV<K, Iterable<V>>>> call(JavaRDD<WindowedValue<KV<K,
- Iterable<WindowedValue<V>>>>> rdd) throws Exception {
- final Accumulator<NamedAggregators> accum =
- AccumulatorSingleton.getInstance(new JavaSparkContext(rdd.context()));
- return GroupCombineFunctions.groupAlsoByWindow(rdd, transform, runtimeContext,
- accum, inputKvCoder);
- }
- });
- sec.setStream(transform, outStream);
- }
- };
- }
-
private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
combineGrouped() {
return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() {
@@ -302,12 +274,10 @@ public final class StreamingTransformTranslator {
PCollection<? extends KV<K, ? extends Iterable<InputT>>> input =
sec.getInput(transform);
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
- AppliedCombineFn<? super K, ? super InputT, ?, OutputT> appliedFn = transform
- .getAppliedFn(context.getPipeline().getCoderRegistry(), input.getCoder(),
- windowingStrategy);
@SuppressWarnings("unchecked")
- CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn =
- (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>) appliedFn.getFn();
+ final CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn =
+ (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>)
+ CombineFnUtil.toFnWithContext(transform.getFn());
@SuppressWarnings("unchecked")
JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream =
@@ -485,8 +455,7 @@ public final class StreamingTransformTranslator {
.newHashMap();
static {
- EVALUATORS.put(GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly.class, gbko());
- EVALUATORS.put(GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow.class, gabw());
+ EVALUATORS.put(GroupByKey.class, groupByKey());
EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
EVALUATORS.put(Combine.Globally.class, combineGlobally());
EVALUATORS.put(Combine.PerKey.class, combinePerKey());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index 96c286a..0a804ae 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -90,6 +90,6 @@ public class SparkSideInputReader implements SideInputReader {
@Override
public boolean isEmpty() {
- return sideInputs.isEmpty();
+ return sideInputs != null && sideInputs.isEmpty();
}
}
[2/2] incubator-beam git commit: This closes #1162
Posted by am...@apache.org.
This closes #1162
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/53fe3ee4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/53fe3ee4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/53fe3ee4
Branch: refs/heads/master
Commit: 53fe3ee425163a76b69d0830449d222d925eb9cd
Parents: f2fe1ae a54ded3
Author: Sela <an...@paypal.com>
Authored: Wed Oct 26 10:01:51 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 26 10:01:51 2016 +0300
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 19 ------
.../translation/GroupCombineFunctions.java | 66 +++++++++-----------
.../spark/translation/TransformTranslator.java | 43 +++----------
.../streaming/StreamingTransformTranslator.java | 65 +++++--------------
.../spark/util/SparkSideInputReader.java | 2 +-
5 files changed, 55 insertions(+), 140 deletions(-)
----------------------------------------------------------------------