You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/28 14:47:47 UTC

[13/50] incubator-beam git commit: [BEAM-799] Support GroupByKey directly.

[BEAM-799] Support GroupByKey directly.

Remove runner override for GroupByKey.

Avoid NPE if no sideInputs are available in reader.

Handle CombineFn with or without context.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a54ded37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a54ded37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a54ded37

Branch: refs/heads/apex-runner
Commit: a54ded373fa7f6508fb46eea1a1d6f9bc405114b
Parents: f2fe1ae
Author: Sela <an...@paypal.com>
Authored: Sat Oct 22 14:51:50 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 26 10:00:45 2016 +0300

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  | 19 ------
 .../translation/GroupCombineFunctions.java      | 66 +++++++++-----------
 .../spark/translation/TransformTranslator.java  | 43 +++----------
 .../streaming/StreamingTransformTranslator.java | 65 +++++--------------
 .../spark/util/SparkSideInputReader.java        |  2 +-
 5 files changed, 55 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index b17c38c..45c7f55 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.spark;
 
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
@@ -36,7 +35,6 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PBegin;
@@ -115,23 +113,6 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
   }
 
   /**
-   * Overrides for this runner.
-   */
-  @SuppressWarnings("rawtypes")
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-
-    if (transform instanceof GroupByKey) {
-      return (OutputT) ((PCollection) input).apply(
-          new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
-    } else {
-      return super.apply(transform, input);
-    }
-  }
-
-
-  /**
    * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
    * thread.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index de02b26..e2a0f87 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -20,11 +20,9 @@ package org.apache.beam.runners.spark.translation;
 
 
 import com.google.common.collect.Lists;
-
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -38,6 +36,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ReifyTimestampAndWindowsDoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -60,54 +59,45 @@ import scala.Tuple2;
 public class GroupCombineFunctions {
 
   /**
-   * Apply {@link GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly} to a Spark RDD.
+   * Apply {@link org.apache.beam.sdk.transforms.GroupByKey} to a Spark RDD.
    */
-  public static <K, V> JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupByKeyOnly(
-      JavaRDD<WindowedValue<KV<K, V>>> rdd, KvCoder<K, V> coder) {
+  public static <K, V,  W extends BoundedWindow> JavaRDD<WindowedValue<KV<K,
+      Iterable<V>>>> groupByKey(JavaRDD<WindowedValue<KV<K, V>>> rdd,
+                                Accumulator<NamedAggregators> accum,
+                                KvCoder<K, V> coder,
+                                SparkRuntimeContext runtimeContext,
+                                WindowingStrategy<?, W> windowingStrategy) {
+    //--- coders.
     final Coder<K> keyCoder = coder.getKeyCoder();
     final Coder<V> valueCoder = coder.getValueCoder();
+    final WindowedValue.WindowedValueCoder<V> wvCoder = WindowedValue.FullWindowedValueCoder.of(
+        valueCoder, windowingStrategy.getWindowFn().windowCoder());
+
+    //--- groupByKey.
     // Use coders to convert objects in the PCollection to byte arrays, so they
     // can be transferred over the network for the shuffle.
-    return rdd.map(WindowingHelpers.<KV<K, V>>unwindowFunction())
-        .mapToPair(TranslationUtils.<K, V>toPairFunction())
-        .mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder))
-        .groupByKey()
-        .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder))
-        // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK
-        .map(TranslationUtils.<K, Iterable<V>>fromPairFunction())
-        .map(WindowingHelpers.<KV<K, Iterable<V>>>windowFunction());
-  }
-
-  /**
-   * Apply {@link GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow} to a Spark RDD.
-   */
-  public static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K, Iterable<V>>>>
-  groupAlsoByWindow(JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> rdd,
-                    GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow<K, V> transform,
-                    SparkRuntimeContext runtimeContext,
-                    Accumulator<NamedAggregators> accum,
-                    KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder) {
-    //--- coders.
-    Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
-    IterableCoder<WindowedValue<V>> inputIterableValueCoder =
-        (IterableCoder<WindowedValue<V>>) inputValueCoder;
-    Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
-    WindowedValue.WindowedValueCoder<V> inputIterableWindowedValueCoder =
-        (WindowedValue.WindowedValueCoder<V>) inputIterableElementCoder;
-    Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+    JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey =
+        rdd.mapPartitions(new DoFnFunction<KV<K, V>, KV<K, WindowedValue<V>>>(null,
+                new ReifyTimestampAndWindowsDoFn<K, V>(), runtimeContext, null, null))
+            .map(WindowingHelpers.<KV<K, WindowedValue<V>>>unwindowFunction())
+            .mapToPair(TranslationUtils.<K, WindowedValue<V>>toPairFunction())
+            .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder))
+            .groupByKey()
+            .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, wvCoder))
+            // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK
+            .map(TranslationUtils.<K, Iterable<WindowedValue<V>>>fromPairFunction())
+            .map(WindowingHelpers.<KV<K, Iterable<WindowedValue<V>>>>windowFunction());
 
-    @SuppressWarnings("unchecked")
-    WindowingStrategy<?, W> windowingStrategy =
-        (WindowingStrategy<?, W>) transform.getWindowingStrategy();
+    //--- now group also by window.
     @SuppressWarnings("unchecked")
     WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();
-
     // GroupAlsoByWindow current uses a dummy in-memory StateInternals
     OldDoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
         new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
             windowingStrategy, new TranslationUtils.InMemoryStateInternalsFactory<K>(),
-                SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
-    return rdd.mapPartitions(new DoFnFunction<>(accum, gabwDoFn, runtimeContext, null, windowFn));
+                SystemReduceFn.<K, V, W>buffering(valueCoder));
+    return groupedByKey.mapPartitions(new DoFnFunction<>(accum, gabwDoFn, runtimeContext, null,
+        windowFn));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index b55e3b2..2e682c4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -32,8 +32,6 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.io.SourceRDD;
@@ -51,6 +49,7 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -58,7 +57,6 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -112,10 +110,10 @@ public final class TransformTranslator {
     };
   }
 
-  private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbko() {
-    return new TransformEvaluator<GroupByKeyOnly<K, V>>() {
+  private static <K, V> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
+    return new TransformEvaluator<GroupByKey<K, V>>() {
       @Override
-      public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) {
+      public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<KV<K, V>>> inRDD =
             (JavaRDD<WindowedValue<KV<K, V>>>) context.getInputRDD(transform);
@@ -123,30 +121,11 @@ public final class TransformTranslator {
         @SuppressWarnings("unchecked")
         final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
 
-        context.setOutputRDD(transform, GroupCombineFunctions.groupByKeyOnly(inRDD, coder));
-      }
-    };
-  }
-
-  private static <K, V, W extends BoundedWindow>
-      TransformEvaluator<GroupAlsoByWindow<K, V>> gabw() {
-    return new TransformEvaluator<GroupAlsoByWindow<K, V>>() {
-      @Override
-      public void evaluate(GroupAlsoByWindow<K, V> transform, EvaluationContext context) {
-        @SuppressWarnings("unchecked")
-        JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> inRDD =
-            (JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>)
-                context.getInputRDD(transform);
-
-        @SuppressWarnings("unchecked")
-        final KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
-            (KvCoder<K, Iterable<WindowedValue<V>>>) context.getInput(transform).getCoder();
-
         final Accumulator<NamedAggregators> accum =
-            AccumulatorSingleton.getInstance(context.getSparkContext());
+                AccumulatorSingleton.getInstance(context.getSparkContext());
 
-        context.setOutputRDD(transform, GroupCombineFunctions.groupAlsoByWindow(inRDD, transform,
-            context.getRuntimeContext(), accum, inputKvCoder));
+        context.setOutputRDD(transform, GroupCombineFunctions.groupByKey(inRDD, accum, coder,
+            context.getRuntimeContext(), context.getInput(transform).getWindowingStrategy()));
       }
     };
   }
@@ -161,13 +140,10 @@ public final class TransformTranslator {
         PCollection<? extends KV<K, ? extends Iterable<InputT>>> input =
             context.getInput(transform);
         WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-        AppliedCombineFn<? super K, ? super InputT, ?, OutputT> appliedFn = transform
-            .getAppliedFn(context.getPipeline().getCoderRegistry(), input.getCoder(),
-                windowingStrategy);
         @SuppressWarnings("unchecked")
         CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn =
             (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>)
-                CombineFnUtil.toFnWithContext(appliedFn.getFn());
+                CombineFnUtil.toFnWithContext(transform.getFn());
 
         @SuppressWarnings("unchecked")
         JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?> inRDD =
@@ -592,8 +568,7 @@ public final class TransformTranslator {
     EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
     EVALUATORS.put(ParDo.Bound.class, parDo());
     EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
-    EVALUATORS.put(GroupByKeyOnly.class, gbko());
-    EVALUATORS.put(GroupAlsoByWindow.class, gabw());
+    EVALUATORS.put(GroupByKey.class, groupByKey());
     EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
     EVALUATORS.put(Combine.Globally.class, combineGlobally());
     EVALUATORS.put(Combine.PerKey.class, combinePerKey());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 9f2d764..1af5e07 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -26,9 +26,6 @@ import java.util.Map;
 import java.util.Set;
 import kafka.serializer.Decoder;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.io.ConsoleIO;
@@ -50,6 +47,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -58,7 +56,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -84,6 +81,7 @@ import org.apache.spark.streaming.kafka.KafkaUtils;
 import scala.Tuple2;
 
 
+
 /**
  * Supports translation between a Beam transform, and Spark's operations on DStreams.
  */
@@ -231,10 +229,10 @@ public final class StreamingTransformTranslator {
     };
   }
 
-  private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbko() {
-    return new TransformEvaluator<GroupByKeyOnly<K, V>>() {
+  private static <K, V> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
+    return new TransformEvaluator<GroupByKey<K, V>>() {
       @Override
-      public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) {
+      public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
 
         @SuppressWarnings("unchecked")
@@ -244,13 +242,20 @@ public final class StreamingTransformTranslator {
         @SuppressWarnings("unchecked")
         final KvCoder<K, V> coder = (KvCoder<K, V>) sec.getInput(transform).getCoder();
 
+        final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+        final WindowingStrategy<?, ?> windowingStrategy =
+            sec.getInput(transform).getWindowingStrategy();
+
         JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
             dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>,
                 JavaRDD<WindowedValue<KV<K, Iterable<V>>>>>() {
           @Override
           public JavaRDD<WindowedValue<KV<K, Iterable<V>>>> call(
               JavaRDD<WindowedValue<KV<K, V>>> rdd) throws Exception {
-            return GroupCombineFunctions.groupByKeyOnly(rdd, coder);
+            final Accumulator<NamedAggregators> accum =
+                AccumulatorSingleton.getInstance(new JavaSparkContext(rdd.context()));
+            return GroupCombineFunctions.groupByKey(rdd, accum, coder, runtimeContext,
+                windowingStrategy);
           }
         });
         sec.setStream(transform, outStream);
@@ -258,39 +263,6 @@ public final class StreamingTransformTranslator {
     };
   }
 
-  private static <K, V, W extends BoundedWindow>
-      TransformEvaluator<GroupAlsoByWindow<K, V>> gabw() {
-    return new TransformEvaluator<GroupAlsoByWindow<K, V>>() {
-      @Override
-      public void evaluate(final GroupAlsoByWindow<K, V> transform, EvaluationContext context) {
-        final StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
-        @SuppressWarnings("unchecked")
-        JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> dStream =
-            (JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>)
-                sec.getStream(transform);
-
-        @SuppressWarnings("unchecked")
-        final KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
-            (KvCoder<K, Iterable<WindowedValue<V>>>) sec.getInput(transform).getCoder();
-
-        JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
-            dStream.transform(new Function<JavaRDD<WindowedValue<KV<K,
-                Iterable<WindowedValue<V>>>>>, JavaRDD<WindowedValue<KV<K, Iterable<V>>>>>() {
-              @Override
-              public JavaRDD<WindowedValue<KV<K, Iterable<V>>>> call(JavaRDD<WindowedValue<KV<K,
-                  Iterable<WindowedValue<V>>>>> rdd) throws Exception {
-                final Accumulator<NamedAggregators> accum =
-                    AccumulatorSingleton.getInstance(new JavaSparkContext(rdd.context()));
-                return GroupCombineFunctions.groupAlsoByWindow(rdd, transform, runtimeContext,
-                    accum, inputKvCoder);
-              }
-            });
-        sec.setStream(transform, outStream);
-      }
-    };
-  }
-
   private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
   combineGrouped() {
     return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() {
@@ -302,12 +274,10 @@ public final class StreamingTransformTranslator {
         PCollection<? extends KV<K, ? extends Iterable<InputT>>> input =
             sec.getInput(transform);
         WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-        AppliedCombineFn<? super K, ? super InputT, ?, OutputT> appliedFn = transform
-            .getAppliedFn(context.getPipeline().getCoderRegistry(), input.getCoder(),
-                windowingStrategy);
         @SuppressWarnings("unchecked")
-        CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn =
-            (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>) appliedFn.getFn();
+        final CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn =
+            (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>)
+                CombineFnUtil.toFnWithContext(transform.getFn());
 
         @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream =
@@ -485,8 +455,7 @@ public final class StreamingTransformTranslator {
       .newHashMap();
 
   static {
-    EVALUATORS.put(GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly.class, gbko());
-    EVALUATORS.put(GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow.class, gabw());
+    EVALUATORS.put(GroupByKey.class, groupByKey());
     EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
     EVALUATORS.put(Combine.Globally.class, combineGlobally());
     EVALUATORS.put(Combine.PerKey.class, combinePerKey());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index 96c286a..0a804ae 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -90,6 +90,6 @@ public class SparkSideInputReader implements SideInputReader {
 
   @Override
   public boolean isEmpty() {
-    return sideInputs.isEmpty();
+    return sideInputs != null && sideInputs.isEmpty();
   }
 }