You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/10/19 22:33:05 UTC

[2/3] incubator-beam git commit: [BEAM-259] Execute selected RunnableOnService tests with Spark runner.

[BEAM-259] Execute selected RunnableOnService tests with Spark runner.

Handle empty Flatten for bounded.

Spark will bubble out a SparkException for user code failure, so this won't catch. Asserting on the
error message should be good enough.

outputWithTimestamp should handle start/finishBundle as well.

Explode WindowedValues before processing.

sideOutputWithTimestamp to address start/finishBundle.

SideInput with windows.

Unused for now, remove.

Take sideInput window startegy into account, for combine as well.

reduce code duplication.

Spark combine support.

reuse code where possible.

Expose sideInputs and insertDefault in Combine.Globally for direct translation.

Direct translation of Combine.Globally into Spark's aggregate function.

Make default run with 4 cores by default - makes tests run with multiple threads, but not too many.

SideInputReader for the Spark runner.

A common abstraction for Keyed and Global implementation.

Implement Combine.Globally via Spark's aggregate.

runnable-on-service profile doesn't need pluginManagement.

Removing test as it does not follow a deterministic combine implementation.

Context reuse is mostly for testing. To avoid a test failure that will stop the context and fail all
following tests we need to recreate the context if it's stopped as well.

WindowFn is used, no need to pass the entire WindowStrategy.

Explode elements for processing only when necessary.

The SparkRunner should use Beam's UserCodeException instead of it's own custom
SparkProcessException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7eecd7ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7eecd7ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7eecd7ee

Branch: refs/heads/master
Commit: 7eecd7ee73cdd2b41e785b4540852530deead429
Parents: b0cb2e8
Author: Sela <an...@paypal.com>
Authored: Fri Sep 23 13:32:28 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Thu Oct 20 00:51:48 2016 +0300

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  73 ++---
 .../runners/spark/SparkPipelineOptions.java     |   2 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  15 +-
 .../beam/runners/spark/TestSparkRunner.java     |   4 +-
 .../runners/spark/translation/DoFnFunction.java |  74 ++---
 .../translation/GroupCombineFunctions.java      | 235 +++++++++-------
 .../spark/translation/MultiDoFnFunction.java    |  85 +++---
 .../translation/SparkAbstractCombineFn.java     | 134 +++++++++
 .../spark/translation/SparkContextFactory.java  |   3 +-
 .../spark/translation/SparkGlobalCombineFn.java | 260 ++++++++++++++++++
 .../spark/translation/SparkKeyedCombineFn.java  | 273 +++++++++++++++++++
 .../spark/translation/SparkProcessContext.java  | 160 +++++++++--
 .../spark/translation/TransformTranslator.java  | 143 +++++-----
 .../spark/translation/TranslationUtils.java     |  28 +-
 .../streaming/StreamingTransformTranslator.java | 126 +++++----
 .../runners/spark/util/BroadcastHelper.java     |  26 --
 .../spark/util/SparkSideInputReader.java        |  95 +++++++
 .../spark/translation/CombineGloballyTest.java  | 101 -------
 .../translation/SparkPipelineOptionsTest.java   |   2 +-
 .../org/apache/beam/sdk/transforms/Combine.java |  14 +
 20 files changed, 1318 insertions(+), 535 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 60b2de9..a246c19 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -59,40 +59,40 @@
       <id>local-runnable-on-service-tests</id>
       <activation><activeByDefault>false</activeByDefault></activation>
       <build>
-        <pluginManagement>
-          <plugins>
-            <plugin>
-              <groupId>org.apache.maven.plugins</groupId>
-              <artifactId>maven-surefire-plugin</artifactId>
-              <executions>
-                <execution>
-                  <id>runnable-on-service-tests</id>
-                  <configuration>
-                    <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
-                    <parallel>none</parallel>
-                    <failIfNoTests>true</failIfNoTests>
-                    <dependenciesToScan>
-                      <dependency>org.apache.beam:java-sdk-all</dependency>
-                    </dependenciesToScan>
-                    <excludes>
-                      org.apache.beam.sdk.io.BoundedReadFromUnboundedSourceTest
-                    </excludes>
-                    <systemPropertyVariables>
-                      <beamTestPipelineOptions>
-                        [
-                          "--runner=TestSparkRunner",
-                          "--streaming=false"
-                        ]
-                      </beamTestPipelineOptions>
-                      <beam.spark.test.reuseSparkContext>true</beam.spark.test.reuseSparkContext>
-                      <spark.ui.enabled>false</spark.ui.enabled>
-                    </systemPropertyVariables>
-                  </configuration>
-                </execution>
-              </executions>
-            </plugin>
-          </plugins>
-        </pluginManagement>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>runnable-on-service-tests</id>
+                <phase>integration-test</phase>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                  <forkCount>1</forkCount>
+                  <reuseForks>false</reuseForks>
+                  <failIfNoTests>true</failIfNoTests>
+                  <dependenciesToScan>
+                    <dependency>org.apache.beam:beam-sdks-java-core</dependency>
+                  </dependenciesToScan>
+                  <systemPropertyVariables>
+                    <beamTestPipelineOptions>
+                      [
+                        "--runner=TestSparkRunner",
+                        "--streaming=false"
+                      ]
+                    </beamTestPipelineOptions>
+                    <beam.spark.test.reuseSparkContext>true</beam.spark.test.reuseSparkContext>
+                    <spark.ui.enabled>false</spark.ui.enabled>
+                  </systemPropertyVariables>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
       </build>
     </profile>
   </profiles>
@@ -147,6 +147,11 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <version>1.3.9</version>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 4c20b10..92d2a7f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -37,7 +37,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingListener;
 public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
                                               ApplicationNameOptions {
   @Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).")
-  @Default.String("local[1]")
+  @Default.String("local[4]")
   String getSparkMaster();
   void setSparkMaster(String master);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 188479c..cad53be 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -23,7 +23,6 @@ import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
-import org.apache.beam.runners.spark.translation.SparkProcessContext;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
@@ -37,6 +36,7 @@ import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
@@ -56,7 +56,7 @@ import org.slf4j.LoggerFactory;
  *
  * {@code
  * Pipeline p = [logic for pipeline creation]
- * EvaluationResult result = SparkRunner.create().run(p);
+ * EvaluationResult result = (EvaluationResult) p.run();
  * }
  *
  * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
  * Pipeline p = [logic for pipeline creation]
  * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
  * options.setSparkMaster("spark://host:port");
- * EvaluationResult result = SparkRunner.create(options).run(p);
+ * EvaluationResult result = (EvaluationResult) p.run();
  * }
  */
 public final class SparkRunner extends PipelineRunner<EvaluationResult> {
@@ -172,12 +172,11 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
       // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
       // won't let you catch something that is not declared, so we can't catch
       // SparkException here. Instead we do an instanceof check.
-      // Then we find the cause by seeing if it's a user exception (wrapped by our
-      // SparkProcessException), or just use the SparkException cause.
+      // Then we find the cause by seeing if it's a user exception (wrapped by Beam's
+      // UserCodeException), or just use the SparkException cause.
       if (e instanceof SparkException && e.getCause() != null) {
-        if (e.getCause() instanceof SparkProcessContext.SparkProcessException
-            && e.getCause().getCause() != null) {
-          throw new RuntimeException(e.getCause().getCause());
+        if (e.getCause() instanceof UserCodeException && e.getCause().getCause() != null) {
+          throw UserCodeException.wrap(e.getCause().getCause());
         } else {
           throw new RuntimeException(e.getCause());
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 6ad6556..a4ddca0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.POutput;
  *
  * {@code
  * Pipeline p = [logic for pipeline creation]
- * EvaluationResult result = SparkRunner.create().run(p);
+ * EvaluationResult result = (EvaluationResult) p.run();
  * }
  *
  * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.values.POutput;
  * Pipeline p = [logic for pipeline creation]
  * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
  * options.setSparkMaster("spark://host:port");
- * EvaluationResult result = SparkRunner.create(options).run(p);
+ * EvaluationResult result = (EvaluationResult) p.run();
  * }
  */
 public final class TestSparkRunner extends PipelineRunner<EvaluationResult> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 79639a2..4dfbee6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -25,12 +25,15 @@ import java.util.Map;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.FlatMapFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+
 
 /**
  * Beam's Do functions correspond to Spark's FlatMap functions.
@@ -42,75 +45,46 @@ public class DoFnFunction<InputT, OutputT>
     implements FlatMapFunction<Iterator<WindowedValue<InputT>>, WindowedValue<OutputT>> {
   private final Accumulator<NamedAggregators> accum;
   private final OldDoFn<InputT, OutputT> mFunction;
-  private static final Logger LOG = LoggerFactory.getLogger(DoFnFunction.class);
-
   private final SparkRuntimeContext mRuntimeContext;
-  private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
+  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> mSideInputs;
+  private final WindowFn<Object, ?> windowFn;
 
   /**
-   * @param accum      The Spark Accumulator that handles the Beam Aggregators.
-   * @param fn         DoFunction to be wrapped.
-   * @param runtime    Runtime to apply function in.
-   * @param sideInputs Side inputs used in DoFunction.
+   * @param accum             The Spark Accumulator that handles the Beam Aggregators.
+   * @param fn                DoFunction to be wrapped.
+   * @param runtime           Runtime to apply function in.
+   * @param sideInputs        Side inputs used in DoFunction.
+   * @param windowFn          Input {@link WindowFn}.
    */
   public DoFnFunction(Accumulator<NamedAggregators> accum,
                       OldDoFn<InputT, OutputT> fn,
                       SparkRuntimeContext runtime,
-                      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
+                      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs,
+                      WindowFn<Object, ?> windowFn) {
     this.accum = accum;
     this.mFunction = fn;
     this.mRuntimeContext = runtime;
     this.mSideInputs = sideInputs;
+    this.windowFn = windowFn;
   }
 
-  /**
-   * @param fn         DoFunction to be wrapped.
-   * @param runtime    Runtime to apply function in.
-   * @param sideInputs Side inputs used in DoFunction.
-   */
-  public DoFnFunction(OldDoFn<InputT, OutputT> fn,
-                      SparkRuntimeContext runtime,
-                      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
-    this(null, fn, runtime, sideInputs);
-  }
 
   @Override
   public Iterable<WindowedValue<OutputT>> call(Iterator<WindowedValue<InputT>> iter) throws
       Exception {
-    ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
-    ctxt.setup();
-    try {
-      mFunction.setup();
-      mFunction.startBundle(ctxt);
-      return ctxt.getOutputIterable(iter, mFunction);
-    } catch (Exception e) {
-      try {
-        // this teardown handles exceptions encountered in setup() and startBundle(). teardown
-        // after execution or due to exceptions in process element is called in the iterator
-        // produced by ctxt.getOutputIterable returned from this method.
-        mFunction.teardown();
-      } catch (Exception teardownException) {
-        LOG.error(
-            "Suppressing exception while tearing down Function {}", mFunction, teardownException);
-        e.addSuppressed(teardownException);
-      }
-      throw e;
-    }
+    return new ProcCtxt(mFunction, mRuntimeContext, mSideInputs, windowFn)
+        .callWithCtxt(iter);
   }
 
   private class ProcCtxt extends SparkProcessContext<InputT, OutputT, WindowedValue<OutputT>> {
 
     private final List<WindowedValue<OutputT>> outputs = new LinkedList<>();
 
-    ProcCtxt(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
-        BroadcastHelper<?>> sideInputs) {
-      super(fn, runtimeContext, sideInputs);
-    }
-
-    @Override
-    public synchronized void output(OutputT o) {
-      outputs.add(windowedValue != null ? windowedValue.withValue(o) :
-          WindowedValue.valueInGlobalWindow(o));
+    ProcCtxt(OldDoFn<InputT, OutputT> fn,
+             SparkRuntimeContext runtimeContext,
+             Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs,
+             WindowFn<Object, ?> windowFn) {
+      super(fn, runtimeContext, sideInputs, windowFn);
     }
 
     @Override
@@ -120,10 +94,6 @@ public class DoFnFunction<InputT, OutputT>
 
     @Override
     public Accumulator<NamedAggregators> getAccumulator() {
-      if (accum == null) {
-        throw new UnsupportedOperationException("SparkRunner does not provide Aggregator support "
-             + "for DoFnFunction of type: " + mFunction.getClass().getCanonicalName());
-      }
       return accum;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 18926bc..de02b26 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -20,33 +20,40 @@ package org.apache.beam.runners.spark.translation;
 
 
 import com.google.common.collect.Lists;
-import java.util.Arrays;
+
 import java.util.Collections;
-import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.runners.spark.util.ByteArray;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 
 import scala.Tuple2;
 
+
+
 /**
  * A set of group/combine functions to apply to Spark {@link org.apache.spark.rdd.RDD}s.
  */
@@ -92,92 +99,160 @@ public class GroupCombineFunctions {
     @SuppressWarnings("unchecked")
     WindowingStrategy<?, W> windowingStrategy =
         (WindowingStrategy<?, W>) transform.getWindowingStrategy();
+    @SuppressWarnings("unchecked")
+    WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();
 
     // GroupAlsoByWindow current uses a dummy in-memory StateInternals
     OldDoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
         new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
             windowingStrategy, new TranslationUtils.InMemoryStateInternalsFactory<K>(),
                 SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
-    return rdd.mapPartitions(new DoFnFunction<>(accum, gabwDoFn, runtimeContext, null));
+    return rdd.mapPartitions(new DoFnFunction<>(accum, gabwDoFn, runtimeContext, null, windowFn));
   }
 
   /**
    * Apply a composite {@link org.apache.beam.sdk.transforms.Combine.Globally} transformation.
    */
-  public static <InputT, AccumT, OutputT> OutputT
+  public static <InputT, AccumT, OutputT> JavaRDD<WindowedValue<OutputT>>
   combineGlobally(JavaRDD<WindowedValue<InputT>> rdd,
-                  final Combine.CombineFn<InputT, AccumT, OutputT> globally,
+                  final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
                   final Coder<InputT> iCoder,
-                  final Coder<AccumT> aCoder) {
+                  final Coder<OutputT> oCoder,
+                  final SparkRuntimeContext runtimeContext,
+                  final WindowingStrategy<?, ?> windowingStrategy,
+                  final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+                      sideInputs,
+                  boolean hasDefault) {
+    // handle empty input RDD, which will natively skip the entire execution as Spark will not
+    // run on empty RDDs.
+    if (rdd.isEmpty()) {
+      JavaSparkContext jsc = new JavaSparkContext(rdd.context());
+      if (hasDefault) {
+        OutputT defaultValue = combineFn.defaultValue();
+        return jsc
+            .parallelize(Lists.newArrayList(CoderHelpers.toByteArray(defaultValue, oCoder)))
+            .map(CoderHelpers.fromByteFunction(oCoder))
+            .map(WindowingHelpers.<OutputT>windowFunction());
+      } else {
+        return jsc.emptyRDD();
+      }
+    }
+
+    //--- coders.
+    final Coder<AccumT> aCoder;
+    try {
+      aCoder = combineFn.getAccumulatorCoder(runtimeContext.getCoderRegistry(), iCoder);
+    } catch (CannotProvideCoderException e) {
+      throw new IllegalStateException("Could not determine coder for accumulator", e);
+    }
+    // windowed coders.
+    final WindowedValue.FullWindowedValueCoder<InputT> wviCoder =
+        WindowedValue.FullWindowedValueCoder.of(iCoder,
+            windowingStrategy.getWindowFn().windowCoder());
+    final WindowedValue.FullWindowedValueCoder<AccumT> wvaCoder =
+        WindowedValue.FullWindowedValueCoder.of(aCoder,
+            windowingStrategy.getWindowFn().windowCoder());
+    final WindowedValue.FullWindowedValueCoder<OutputT> wvoCoder =
+        WindowedValue.FullWindowedValueCoder.of(oCoder,
+            windowingStrategy.getWindowFn().windowCoder());
+
+    final SparkGlobalCombineFn<InputT, AccumT, OutputT> sparkCombineFn =
+        new SparkGlobalCombineFn<>(combineFn, runtimeContext, sideInputs, windowingStrategy);
+    final IterableCoder<WindowedValue<AccumT>> iterAccumCoder = IterableCoder.of(wvaCoder);
+
+
     // Use coders to convert objects in the PCollection to byte arrays, so they
     // can be transferred over the network for the shuffle.
-    JavaRDD<byte[]> inRddBytes = rdd.map(WindowingHelpers.<InputT>unwindowFunction()).map(
-        CoderHelpers.toByteFunction(iCoder));
+    JavaRDD<byte[]> inRddBytes = rdd.map(CoderHelpers.toByteFunction(wviCoder));
     /*AccumT*/ byte[] acc = inRddBytes.aggregate(
-        CoderHelpers.toByteArray(globally.createAccumulator(), aCoder),
+        CoderHelpers.toByteArray(sparkCombineFn.zeroValue(), iterAccumCoder),
         new Function2</*AccumT*/ byte[], /*InputT*/ byte[], /*AccumT*/ byte[]>() {
           @Override
           public /*AccumT*/ byte[] call(/*AccumT*/ byte[] ab, /*InputT*/ byte[] ib)
               throws Exception {
-            AccumT a = CoderHelpers.fromByteArray(ab, aCoder);
-            InputT i = CoderHelpers.fromByteArray(ib, iCoder);
-            return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder);
+            Iterable<WindowedValue<AccumT>> a = CoderHelpers.fromByteArray(ab, iterAccumCoder);
+            WindowedValue<InputT> i = CoderHelpers.fromByteArray(ib, wviCoder);
+            return CoderHelpers.toByteArray(sparkCombineFn.seqOp(a, i), iterAccumCoder);
           }
         },
         new Function2</*AccumT*/ byte[], /*AccumT*/ byte[], /*AccumT*/ byte[]>() {
           @Override
           public /*AccumT*/ byte[] call(/*AccumT*/ byte[] a1b, /*AccumT*/ byte[] a2b)
               throws Exception {
-            AccumT a1 = CoderHelpers.fromByteArray(a1b, aCoder);
-            AccumT a2 = CoderHelpers.fromByteArray(a2b, aCoder);
-            // don't use Guava's ImmutableList.of as values may be null
-            List<AccumT> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
-            AccumT merged = globally.mergeAccumulators(accumulators);
-            return CoderHelpers.toByteArray(merged, aCoder);
+            Iterable<WindowedValue<AccumT>> a1 = CoderHelpers.fromByteArray(a1b, iterAccumCoder);
+            Iterable<WindowedValue<AccumT>> a2 = CoderHelpers.fromByteArray(a2b, iterAccumCoder);
+            Iterable<WindowedValue<AccumT>> merged = sparkCombineFn.combOp(a1, a2);
+            return CoderHelpers.toByteArray(merged, iterAccumCoder);
           }
         }
     );
-    return globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
+    Iterable<WindowedValue<OutputT>> output =
+        sparkCombineFn.extractOutput(CoderHelpers.fromByteArray(acc, iterAccumCoder));
+    return new JavaSparkContext(rdd.context()).parallelize(
+        CoderHelpers.toByteArrays(output, wvoCoder)).map(CoderHelpers.fromByteFunction(wvoCoder));
   }
 
   /**
    * Apply a composite {@link org.apache.beam.sdk.transforms.Combine.PerKey} transformation.
+   * <p>
+   * This aggregation will apply Beam's {@link org.apache.beam.sdk.transforms.Combine.CombineFn}
+   * via Spark's {@link JavaPairRDD#combineByKey(Function, Function2, Function2)} aggregation.
+   * </p>
+   * For streaming, this will be called from within a serialized context
+   * (DStream's transform callback), so passed arguments need to be Serializable.
    */
   public static <K, InputT, AccumT, OutputT> JavaRDD<WindowedValue<KV<K, OutputT>>>
   combinePerKey(JavaRDD<WindowedValue<KV<K, InputT>>> rdd,
-                final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> keyed,
-                final WindowedValue.FullWindowedValueCoder<K> wkCoder,
-                final WindowedValue.FullWindowedValueCoder<KV<K, InputT>> wkviCoder,
-                final WindowedValue.FullWindowedValueCoder<KV<K, AccumT>> wkvaCoder) {
+                final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
+                    combineFn,
+                final KvCoder<K, InputT> inputCoder,
+                final SparkRuntimeContext runtimeContext,
+                final WindowingStrategy<?, ?> windowingStrategy,
+                final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>,
+                    BroadcastHelper<?>>> sideInputs) {
+    //--- coders.
+    final Coder<K> keyCoder = inputCoder.getKeyCoder();
+    final Coder<InputT> viCoder = inputCoder.getValueCoder();
+    final Coder<AccumT> vaCoder;
+    try {
+      vaCoder = combineFn.getAccumulatorCoder(runtimeContext.getCoderRegistry(), keyCoder, viCoder);
+    } catch (CannotProvideCoderException e) {
+      throw new IllegalStateException("Could not determine coder for accumulator", e);
+    }
+    // windowed coders.
+    final WindowedValue.FullWindowedValueCoder<KV<K, InputT>> wkviCoder =
+        WindowedValue.FullWindowedValueCoder.of(KvCoder.of(keyCoder, viCoder),
+            windowingStrategy.getWindowFn().windowCoder());
+    final WindowedValue.FullWindowedValueCoder<KV<K, AccumT>> wkvaCoder =
+        WindowedValue.FullWindowedValueCoder.of(KvCoder.of(keyCoder, vaCoder),
+            windowingStrategy.getWindowFn().windowCoder());
+
     // We need to duplicate K as both the key of the JavaPairRDD as well as inside the value,
     // since the functions passed to combineByKey don't receive the associated key of each
     // value, and we need to map back into methods in Combine.KeyedCombineFn, which each
     // require the key in addition to the InputT's and AccumT's being merged/accumulated.
     // Once Spark provides a way to include keys in the arguments of combine/merge functions,
     // we won't need to duplicate the keys anymore.
-    // Key has to bw windowed in order to group by window as well
-    JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
+    // Key has to bw windowed in order to group by window as well.
+    JavaPairRDD<K, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
         rdd.flatMapToPair(
-            new PairFlatMapFunction<WindowedValue<KV<K, InputT>>, WindowedValue<K>,
+            new PairFlatMapFunction<WindowedValue<KV<K, InputT>>, K,
                 WindowedValue<KV<K, InputT>>>() {
               @Override
-              public Iterable<Tuple2<WindowedValue<K>, WindowedValue<KV<K, InputT>>>>
-              call(WindowedValue<KV<K, InputT>> kv) {
-                  List<Tuple2<WindowedValue<K>,
-                      WindowedValue<KV<K, InputT>>>> tuple2s =
-                      Lists.newArrayListWithCapacity(kv.getWindows().size());
-                  for (BoundedWindow boundedWindow: kv.getWindows()) {
-                    WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
-                        boundedWindow.maxTimestamp(), boundedWindow, kv.getPane());
-                    tuple2s.add(new Tuple2<>(wk, kv));
-                  }
-                return tuple2s;
+              public Iterable<Tuple2<K, WindowedValue<KV<K, InputT>>>>
+              call(WindowedValue<KV<K, InputT>> wkv) {
+                return Collections.singletonList(new Tuple2<>(wkv.getValue().getKey(), wkv));
               }
             });
+
+    final SparkKeyedCombineFn<K, InputT, AccumT, OutputT> sparkCombineFn =
+        new SparkKeyedCombineFn<>(combineFn, runtimeContext, sideInputs, windowingStrategy);
+    final IterableCoder<WindowedValue<KV<K, AccumT>>> iterAccumCoder = IterableCoder.of(wkvaCoder);
+
     // Use coders to convert objects in the PCollection to byte arrays, so they
     // can be transferred over the network for the shuffle.
     JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair
-        .mapToPair(CoderHelpers.toByteFunction(wkCoder, wkviCoder));
+        .mapToPair(CoderHelpers.toByteFunction(keyCoder, wkviCoder));
 
     // The output of combineByKey will be "AccumT" (accumulator)
     // types rather than "OutputT" (final output types) since Combine.CombineFn
@@ -187,76 +262,52 @@ public class GroupCombineFunctions {
         new Function</*KV<K, InputT>*/ byte[], /*KV<K, AccumT>*/ byte[]>() {
           @Override
           public /*KV<K, AccumT>*/ byte[] call(/*KV<K, InputT>*/ byte[] input) {
-            WindowedValue<KV<K, InputT>> wkvi =
-                CoderHelpers.fromByteArray(input, wkviCoder);
-            AccumT va = keyed.createAccumulator(wkvi.getValue().getKey());
-            va = keyed.addInput(wkvi.getValue().getKey(), va, wkvi.getValue().getValue());
-            WindowedValue<KV<K, AccumT>> wkva =
-                WindowedValue.of(KV.of(wkvi.getValue().getKey(), va), wkvi.getTimestamp(),
-                wkvi.getWindows(), wkvi.getPane());
-            return CoderHelpers.toByteArray(wkva, wkvaCoder);
+            WindowedValue<KV<K, InputT>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
+            return CoderHelpers.toByteArray(sparkCombineFn.createCombiner(wkvi), iterAccumCoder);
           }
         },
-        new Function2</*KV<K, AccumT>*/ byte[],
-            /*KV<K, InputT>*/ byte[],
+        new Function2</*KV<K, AccumT>*/ byte[], /*KV<K, InputT>*/ byte[],
             /*KV<K, AccumT>*/ byte[]>() {
           @Override
           public /*KV<K, AccumT>*/ byte[] call(/*KV<K, AccumT>*/ byte[] acc,
               /*KV<K, InputT>*/ byte[] input) {
-            WindowedValue<KV<K, AccumT>> wkva =
-                CoderHelpers.fromByteArray(acc, wkvaCoder);
-            WindowedValue<KV<K, InputT>> wkvi =
-                CoderHelpers.fromByteArray(input, wkviCoder);
-            AccumT va =
-                keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(),
-                wkvi.getValue().getValue());
-            wkva = WindowedValue.of(KV.of(wkva.getValue().getKey(), va), wkva.getTimestamp(),
-                wkva.getWindows(), wkva.getPane());
-            return CoderHelpers.toByteArray(wkva, wkvaCoder);
+            Iterable<WindowedValue<KV<K, AccumT>>> wkvas =
+                CoderHelpers.fromByteArray(acc, iterAccumCoder);
+            WindowedValue<KV<K, InputT>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
+            return CoderHelpers.toByteArray(sparkCombineFn.mergeValue(wkvi, wkvas), iterAccumCoder);
           }
         },
-        new Function2</*KV<K, AccumT>*/ byte[],
-            /*KV<K, AccumT>*/ byte[],
+        new Function2</*KV<K, AccumT>*/ byte[], /*KV<K, AccumT>*/ byte[],
             /*KV<K, AccumT>*/ byte[]>() {
           @Override
           public /*KV<K, AccumT>*/ byte[] call(/*KV<K, AccumT>*/ byte[] acc1,
               /*KV<K, AccumT>*/ byte[] acc2) {
-            WindowedValue<KV<K, AccumT>> wkva1 =
-                CoderHelpers.fromByteArray(acc1, wkvaCoder);
-            WindowedValue<KV<K, AccumT>> wkva2 =
-                CoderHelpers.fromByteArray(acc2, wkvaCoder);
-            AccumT va = keyed.mergeAccumulators(wkva1.getValue().getKey(),
-                // don't use Guava's ImmutableList.of as values may be null
-                Collections.unmodifiableList(Arrays.asList(wkva1.getValue().getValue(),
-                wkva2.getValue().getValue())));
-            WindowedValue<KV<K, AccumT>> wkva =
-                WindowedValue.of(KV.of(wkva1.getValue().getKey(),
-                va), wkva1.getTimestamp(), wkva1.getWindows(), wkva1.getPane());
-            return CoderHelpers.toByteArray(wkva, wkvaCoder);
+            Iterable<WindowedValue<KV<K, AccumT>>> wkvas1 =
+                CoderHelpers.fromByteArray(acc1, iterAccumCoder);
+            Iterable<WindowedValue<KV<K, AccumT>>> wkvas2 =
+                CoderHelpers.fromByteArray(acc2, iterAccumCoder);
+            return CoderHelpers.toByteArray(sparkCombineFn.mergeCombiners(wkvas1, wkvas2),
+                iterAccumCoder);
           }
         });
 
-    JavaPairRDD<WindowedValue<K>, WindowedValue<OutputT>> extracted = accumulatedBytes
-        .mapToPair(CoderHelpers.fromByteFunction(wkCoder, wkvaCoder))
-        .mapValues(new Function<WindowedValue<KV<K, AccumT>>, WindowedValue<OutputT>>() {
-              @Override
-              public WindowedValue<OutputT> call(WindowedValue<KV<K, AccumT>> acc) {
-                return WindowedValue.of(keyed.extractOutput(acc.getValue().getKey(),
-                    acc.getValue().getValue()), acc.getTimestamp(), acc.getWindows(),
-                        acc.getPane());
-              }
-            });
-    return extracted.map(TranslationUtils.<WindowedValue<K>,
-        WindowedValue<OutputT>>fromPairFunction()).map(
-            new Function<KV<WindowedValue<K>, WindowedValue<OutputT>>,
-                WindowedValue<KV<K, OutputT>>>() {
+    JavaPairRDD<K, WindowedValue<OutputT>> extracted = accumulatedBytes
+        .mapToPair(CoderHelpers.fromByteFunction(keyCoder, iterAccumCoder))
+        .flatMapValues(new Function<Iterable<WindowedValue<KV<K, AccumT>>>,
+            Iterable<WindowedValue<OutputT>>>() {
               @Override
-              public WindowedValue<KV<K, OutputT>> call(KV<WindowedValue<K>,
-                  WindowedValue<OutputT>> kwvo) throws Exception {
-                WindowedValue<OutputT> wvo = kwvo.getValue();
-                KV<K, OutputT> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue());
-                return WindowedValue.of(kvo, wvo.getTimestamp(), wvo.getWindows(), wvo.getPane());
+              public Iterable<WindowedValue<OutputT>> call(
+                  Iterable<WindowedValue<KV<K, AccumT>>> accums) {
+                return sparkCombineFn.extractOutput(accums);
               }
             });
+    return extracted.map(TranslationUtils.<K, WindowedValue<OutputT>>fromPairFunction()).map(
+        new Function<KV<K, WindowedValue<OutputT>>, WindowedValue<KV<K, OutputT>>>() {
+          @Override
+          public WindowedValue<KV<K, OutputT>> call(KV<K, WindowedValue<OutputT>> kwvo)
+              throws Exception {
+            return kwvo.getValue().withValue(KV.of(kwvo.getKey(), kwvo.getValue().getValue()));
+          }
+        });
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 163cf13..1168381 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -27,7 +27,10 @@ import java.util.Map;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -49,51 +52,37 @@ public class MultiDoFnFunction<InputT, OutputT>
   private final OldDoFn<InputT, OutputT> mFunction;
   private final SparkRuntimeContext mRuntimeContext;
   private final TupleTag<OutputT> mMainOutputTag;
-  private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
+  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> mSideInputs;
+  private final WindowFn<Object, ?> windowFn;
 
   /**
-   * @param accum          The Spark Accumulator that handles the Beam Aggregators.
-   * @param fn             DoFunction to be wrapped.
-   * @param runtimeContext Runtime to apply function in.
-   * @param mainOutputTag  The main output {@link TupleTag}.
-   * @param sideInputs     Side inputs used in DoFunction.
+   * @param accum             The Spark Accumulator that handles the Beam Aggregators.
+   * @param fn                DoFunction to be wrapped.
+   * @param runtimeContext    Runtime to apply function in.
+   * @param mainOutputTag     The main output {@link TupleTag}.
+   * @param sideInputs        Side inputs used in DoFunction.
+   * @param windowFn          Input {@link WindowFn}.
    */
-  public MultiDoFnFunction(
-      Accumulator<NamedAggregators> accum,
-      OldDoFn<InputT, OutputT> fn,
-      SparkRuntimeContext runtimeContext,
-      TupleTag<OutputT> mainOutputTag,
-      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
+  public MultiDoFnFunction(Accumulator<NamedAggregators> accum,
+                           OldDoFn<InputT, OutputT> fn,
+                           SparkRuntimeContext runtimeContext,
+                           TupleTag<OutputT> mainOutputTag,
+                           Map<TupleTag<?>, KV<WindowingStrategy<?, ?>,
+                               BroadcastHelper<?>>> sideInputs,
+                           WindowFn<Object, ?> windowFn) {
     this.accum = accum;
     this.mFunction = fn;
     this.mRuntimeContext = runtimeContext;
     this.mMainOutputTag = mainOutputTag;
     this.mSideInputs = sideInputs;
+    this.windowFn = windowFn;
   }
 
-  /**
-   * @param fn             DoFunction to be wrapped.
-   * @param runtimeContext Runtime to apply function in.
-   * @param mainOutputTag  The main output {@link TupleTag}.
-   * @param sideInputs     Side inputs used in DoFunction.
-   */
-  public MultiDoFnFunction(
-      OldDoFn<InputT, OutputT> fn,
-      SparkRuntimeContext runtimeContext,
-      TupleTag<OutputT> mainOutputTag,
-      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
-    this(null, fn, runtimeContext, mainOutputTag, sideInputs);
-  }
-
-
   @Override
   public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>>
       call(Iterator<WindowedValue<InputT>> iter) throws Exception {
-    ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
-    mFunction.setup();
-    mFunction.startBundle(ctxt);
-    ctxt.setup();
-    return ctxt.getOutputIterable(iter, mFunction);
+    return new ProcCtxt(mFunction, mRuntimeContext, mSideInputs, windowFn)
+        .callWithCtxt(iter);
   }
 
   private class ProcCtxt
@@ -101,14 +90,11 @@ public class MultiDoFnFunction<InputT, OutputT>
 
     private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();
 
-    ProcCtxt(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
-        BroadcastHelper<?>> sideInputs) {
-      super(fn, runtimeContext, sideInputs);
-    }
-
-    @Override
-    public synchronized void output(OutputT o) {
-      outputs.put(mMainOutputTag, windowedValue.withValue(o));
+    ProcCtxt(OldDoFn<InputT, OutputT> fn,
+             SparkRuntimeContext runtimeContext,
+             Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs,
+             WindowFn<Object, ?> windowFn) {
+      super(fn, runtimeContext, sideInputs, windowFn);
     }
 
     @Override
@@ -118,21 +104,24 @@ public class MultiDoFnFunction<InputT, OutputT>
 
     @Override
     public synchronized <T> void sideOutput(TupleTag<T> tag, T t) {
-      outputs.put(tag, windowedValue.withValue(t));
+      sideOutputWithTimestamp(tag, t, windowedValue != null ? windowedValue.getTimestamp() : null);
     }
 
     @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
-      outputs.put(tupleTag, WindowedValue.of(t, instant,
-          windowedValue.getWindows(), windowedValue.getPane()));
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag,
+                                            final T t,
+                                            final Instant timestamp) {
+      if (windowedValue == null) {
+        // this is start/finishBundle.
+        outputs.put(tupleTag, noElementWindowedValue(t, timestamp, windowFn));
+      } else {
+        outputs.put(tupleTag, WindowedValue.of(t, timestamp, windowedValue.getWindows(),
+            windowedValue.getPane()));
+      }
     }
 
     @Override
     public Accumulator<NamedAggregators> getAccumulator() {
-      if (accum == null) {
-        throw new UnsupportedOperationException("SparkRunner does not provide Aggregator support "
-             + "for MultiDoFnFunction of type: " + mFunction.getClass().getCanonicalName());
-      }
       return accum;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
new file mode 100644
index 0000000..6aeb0db
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SparkSideInputReader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+
+/**
+ * An abstract for the SparkRunner implementation of
+ * {@link org.apache.beam.sdk.transforms.Combine.CombineFn}.
+ */
+public class SparkAbstractCombineFn implements Serializable {
+  protected final SparkRuntimeContext runtimeContext;
+  protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+  protected final WindowingStrategy<?, ?> windowingStrategy;
+
+
+  public SparkAbstractCombineFn(SparkRuntimeContext runtimeContext,
+                                Map<TupleTag<?>,  KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+                                    sideInputs,
+                                WindowingStrategy<?, ?> windowingStrategy) {
+    this.runtimeContext = runtimeContext;
+    this.sideInputs = sideInputs;
+    this.windowingStrategy = windowingStrategy;
+  }
+
+  // each Spark task should get it's own copy of this SparkKeyedCombineFn, and since Spark tasks
+  // are single-threaded, it is safe to reuse the context.
+  // the combine context is not Serializable so we'll use lazy initialization.
+  // ** DO NOT attempt to turn this into a Singleton as Spark may run multiple tasks in parallel
+  // in the same JVM (Executor). **
+  // ** DO NOT use combineContext directly inside this class, use ctxtForInput instead. **
+  private transient SparkCombineContext combineContext;
+  protected SparkCombineContext ctxtForInput(WindowedValue<?> input) {
+    if (combineContext == null) {
+      combineContext = new SparkCombineContext(runtimeContext.getPipelineOptions(),
+          new SparkSideInputReader(sideInputs));
+    }
+    return combineContext.forInput(input);
+  }
+
+  protected static <T> Iterable<WindowedValue<T>> sortByWindows(Iterable<WindowedValue<T>> iter) {
+    List<WindowedValue<T>> sorted = Lists.newArrayList(iter);
+    Collections.sort(sorted, new Comparator<WindowedValue<T>>() {
+      @Override
+      public int compare(WindowedValue<T> o1, WindowedValue<T> o2) {
+        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp().compareTo(
+            Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+      }
+    });
+    return sorted;
+  }
+
+  protected static boolean isIntersecting(IntervalWindow union, IntervalWindow window) {
+    return union == null || union.intersects(window);
+  }
+
+  protected static IntervalWindow merge(IntervalWindow union, IntervalWindow window) {
+    return union == null ? window : union.span(window);
+  }
+
+  /**
+   * An implementation of {@link CombineWithContext.Context} for the SparkRunner.
+   */
+  private static class SparkCombineContext extends CombineWithContext.Context {
+    private final PipelineOptions pipelineOptions;
+    private final SideInputReader sideInputReader;
+
+    SparkCombineContext(PipelineOptions pipelineOptions, SideInputReader sideInputReader) {
+      this.pipelineOptions = pipelineOptions;
+      this.sideInputReader = sideInputReader;
+    }
+
+    private WindowedValue<?> input = null;
+
+    SparkCombineContext forInput(WindowedValue<?> input) {
+      this.input = input;
+      return this;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      checkNotNull(input, "Input in SparkCombineContext must not be null!");
+      //validate element window.
+      final Collection<? extends BoundedWindow> elementWindows = input.getWindows();
+      checkState(elementWindows.size() == 1, "sideInput can only be called when the main "
+          + "input element is in exactly one window");
+      return sideInputReader.get(view, elementWindows.iterator().next());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 8127ddc..4877f6e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -47,7 +47,8 @@ public final class SparkContextFactory {
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     // reuse should be ignored if the context is provided.
     if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !options.getUsesProvidedSparkContext()) {
-      if (sparkContext == null) {
+      // if the context is null or stopped for some reason, re-create it.
+      if (sparkContext == null || sparkContext.sc().isStopped()) {
         sparkContext = createSparkContext(options);
         sparkMaster = options.getSparkMaster();
       } else if (!options.getSparkMaster().equals(sparkMaster)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
new file mode 100644
index 0000000..5339fb3
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+
+
+/**
+ * A {@link org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn}
+ * with a {@link CombineWithContext.Context} for the SparkRunner.
+ */
+public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstractCombineFn {
+  private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
+
+  public SparkGlobalCombineFn(CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>
+                                  combineFn,
+                              SparkRuntimeContext runtimeContext,
+                              Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+                                 sideInputs,
+                              WindowingStrategy<?, ?> windowingStrategy) {
+    super(runtimeContext, sideInputs, windowingStrategy);
+    this.combineFn = combineFn;
+  }
+
+  /**
+   * Implements Spark's zeroValue function in:
+   * <p>
+   * {@link org.apache.spark.api.java.JavaRDD#aggregate}.
+   * </p>
+   */
+  Iterable<WindowedValue<AccumT>> zeroValue() {
+    return Lists.newArrayList();
+  }
+
+  private Iterable<WindowedValue<AccumT>> createAccumulator(WindowedValue<InputT> input) {
+
+    // sort exploded inputs.
+    Iterable<WindowedValue<InputT>> sortedInputs = sortByWindows(input.explodeWindows());
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+
+    //--- inputs iterator, by window order.
+    final Iterator<WindowedValue<InputT>> iterator = sortedInputs.iterator();
+    WindowedValue<InputT> currentInput = iterator.next();
+    BoundedWindow currentWindow = Iterables.getFirst(currentInput.getWindows(), null);
+
+    // first create the accumulator and accumulate first input.
+    AccumT accumulator = combineFn.createAccumulator(ctxtForInput(currentInput));
+    accumulator = combineFn.addInput(accumulator, currentInput.getValue(),
+        ctxtForInput(currentInput));
+
+    // keep track of the timestamps assigned by the OutputTimeFn.
+    Instant windowTimestamp =
+        outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
+
+    // accumulate the next windows, or output.
+    List<WindowedValue<AccumT>> output = Lists.newArrayList();
+
+    // if merging, merge overlapping windows, e.g. Sessions.
+    final boolean merging = !windowingStrategy.getWindowFn().isNonMerging();
+
+    while (iterator.hasNext()) {
+      WindowedValue<InputT> nextValue = iterator.next();
+      BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
+
+      boolean mergingAndIntersecting = merging
+          && isIntersecting((IntervalWindow) currentWindow, (IntervalWindow) nextWindow);
+
+      if (mergingAndIntersecting || nextWindow.equals(currentWindow)) {
+        if (mergingAndIntersecting) {
+          // merge intersecting windows.
+          currentWindow = merge((IntervalWindow) currentWindow, (IntervalWindow) nextWindow);
+        }
+        // keep accumulating and carry on ;-)
+        accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
+            ctxtForInput(nextValue));
+        windowTimestamp = outputTimeFn.combine(windowTimestamp,
+            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+      } else {
+        // moving to the next window, first add the current accumulation to output
+        // and initialize the accumulator.
+        output.add(WindowedValue.of(accumulator, windowTimestamp, currentWindow,
+            PaneInfo.NO_FIRING));
+        // re-init accumulator, window and timestamp.
+        accumulator = combineFn.createAccumulator(ctxtForInput(nextValue));
+        accumulator = combineFn.addInput(accumulator, nextValue.getValue(),
+            ctxtForInput(nextValue));
+        currentWindow = nextWindow;
+        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+      }
+    }
+
+    // add last accumulator to the output.
+    output.add(WindowedValue.of(accumulator, windowTimestamp, currentWindow, PaneInfo.NO_FIRING));
+
+    return output;
+  }
+
+  /**
+   * Implement Spark's seqOp function in:
+   * <p>
+   * {@link org.apache.spark.api.java.JavaRDD#aggregate}.
+   * </p>
+   */
+  Iterable<WindowedValue<AccumT>> seqOp(Iterable<WindowedValue<AccumT>> accum,
+                                        WindowedValue<InputT> input) {
+    return combOp(accum, createAccumulator(input));
+  }
+
+  /**
+   * Implement Spark's combOp function in:
+   * <p>
+   * {@link org.apache.spark.api.java.JavaRDD#aggregate}.
+   * </p>
+   */
+  Iterable<WindowedValue<AccumT>> combOp(Iterable<WindowedValue<AccumT>> a1,
+                                         Iterable<WindowedValue<AccumT>> a2) {
+
+    // concatenate accumulators.
+    Iterable<WindowedValue<AccumT>> accumulators = Iterables.concat(a1, a2);
+    // if empty, return an empty accumulators iterable.
+    if (!accumulators.iterator().hasNext()) {
+      return Lists.newArrayList();
+    }
+
+    // sort accumulators, no need to explode since inputs were exploded.
+    Iterable<WindowedValue<AccumT>> sortedAccumulators = sortByWindows(accumulators);
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+
+    //--- accumulators iterator, by window order.
+    final Iterator<WindowedValue<AccumT>> iterator = sortedAccumulators.iterator();
+
+    // get the first accumulator and assign it to the current window's accumulators.
+    WindowedValue<AccumT> currentValue = iterator.next();
+    BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
+    List<AccumT> currentWindowAccumulators = Lists.newArrayList();
+    currentWindowAccumulators.add(currentValue.getValue());
+
+    // keep track of the timestamps assigned by the OutputTimeFn,
+    // in createCombiner we already merge the timestamps assigned
+    // to individual elements, here we will just merge them.
+    List<Instant> windowTimestamps = Lists.newArrayList();
+    windowTimestamps.add(currentValue.getTimestamp());
+
+    // accumulate the next windows, or output.
+    List<WindowedValue<AccumT>> output = Lists.newArrayList();
+
+    // if merging, merge overlapping windows, e.g. Sessions.
+    final boolean merging = !windowingStrategy.getWindowFn().isNonMerging();
+
+    while (iterator.hasNext()) {
+      WindowedValue<AccumT> nextValue = iterator.next();
+      BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
+
+      boolean mergingAndIntersecting = merging
+          && isIntersecting((IntervalWindow) currentWindow, (IntervalWindow) nextWindow);
+
+      if (mergingAndIntersecting || nextWindow.equals(currentWindow)) {
+        if (mergingAndIntersecting) {
+          // merge intersecting windows.
+          currentWindow = merge((IntervalWindow) currentWindow, (IntervalWindow) nextWindow);
+        }
+        // add to window accumulators.
+        currentWindowAccumulators.add(nextValue.getValue());
+        windowTimestamps.add(nextValue.getTimestamp());
+      } else {
+        // before moving to the next window,
+        // add the current accumulation to the output and initialize the accumulation.
+
+        // merge the timestamps of all accumulators to merge.
+        Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+
+        // merge accumulators.
+        // transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
+        // for the (possibly merged) window.
+        Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
+        WindowedValue<Iterable<AccumT>> preMergeWindowedValue = WindowedValue.of(
+            accumsToMerge, mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
+        // applying the actual combiner onto the accumulators.
+        AccumT accumulated = combineFn.mergeAccumulators(accumsToMerge,
+            ctxtForInput(preMergeWindowedValue));
+        WindowedValue<AccumT> postMergeWindowedValue = preMergeWindowedValue.withValue(accumulated);
+        // emit the accumulated output.
+        output.add(postMergeWindowedValue);
+
+        // re-init accumulator, window and timestamps.
+        currentWindowAccumulators.clear();
+        currentWindowAccumulators.add(nextValue.getValue());
+        currentWindow = nextWindow;
+        windowTimestamps.clear();
+        windowTimestamps.add(nextValue.getTimestamp());
+      }
+    }
+
+    // merge the last chunk of accumulators.
+    Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+    Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
+    WindowedValue<Iterable<AccumT>> preMergeWindowedValue = WindowedValue.of(
+        accumsToMerge, mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
+    AccumT accumulated = combineFn.mergeAccumulators(accumsToMerge,
+        ctxtForInput(preMergeWindowedValue));
+    WindowedValue<AccumT> postMergeWindowedValue = preMergeWindowedValue.withValue(accumulated);
+    output.add(postMergeWindowedValue);
+
+    return output;
+  }
+
+  Iterable<WindowedValue<OutputT>> extractOutput(Iterable<WindowedValue<AccumT>> wvas) {
+    return Iterables.transform(wvas,
+        new Function<WindowedValue<AccumT>, WindowedValue<OutputT>>() {
+          @Nullable
+          @Override
+          public WindowedValue<OutputT> apply(@Nullable WindowedValue<AccumT> wva) {
+            if (wva == null) {
+              return null;
+            }
+            return wva.withValue(combineFn.extractOutput(wva.getValue(), ctxtForInput(wva)));
+          }
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7eecd7ee/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
new file mode 100644
index 0000000..910f7f0
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+
+
+/**
+ * A {@link org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn}
+ * with a {@link org.apache.beam.sdk.transforms.CombineWithContext.Context} for the SparkRunner.
+ */
+public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstractCombineFn {
+  private final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
+
+  public SparkKeyedCombineFn(CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT,
+                                 OutputT> combineFn,
+                             SparkRuntimeContext runtimeContext,
+                             Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+                                 sideInputs,
+                             WindowingStrategy<?, ?> windowingStrategy) {
+    super(runtimeContext, sideInputs, windowingStrategy);
+    this.combineFn = combineFn;
+  }
+
+  /** Applying the combine function directly on a key's grouped values - post grouping. */
+  public OutputT apply(WindowedValue<KV<K, Iterable<InputT>>> windowedKv) {
+    // apply combine function on grouped values.
+    return combineFn.apply(windowedKv.getValue().getKey(), windowedKv.getValue().getValue(),
+        ctxtForInput(windowedKv));
+  }
+
+  /**
+   * Implements Spark's createCombiner function in:
+   * <p>
+   * {@link org.apache.spark.rdd.PairRDDFunctions#combineByKey}.
+   * </p>
+   */
+  Iterable<WindowedValue<KV<K, AccumT>>> createCombiner(WindowedValue<KV<K, InputT>> wkvi) {
+    // sort exploded inputs.
+    Iterable<WindowedValue<KV<K, InputT>>> sortedInputs = sortByWindows(wkvi.explodeWindows());
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+
+    //--- inputs iterator, by window order.
+    final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInputs.iterator();
+    WindowedValue<KV<K, InputT>> currentInput = iterator.next();
+    BoundedWindow currentWindow = Iterables.getFirst(currentInput.getWindows(), null);
+
+    // first create the accumulator and accumulate first input.
+    K key = currentInput.getValue().getKey();
+    AccumT accumulator = combineFn.createAccumulator(key, ctxtForInput(currentInput));
+    accumulator = combineFn.addInput(key, accumulator, currentInput.getValue().getValue(),
+        ctxtForInput(currentInput));
+
+    // keep track of the timestamps assigned by the OutputTimeFn.
+    Instant windowTimestamp =
+        outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
+
+    // accumulate the next windows, or output.
+    List<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList();
+
+    // if merging, merge overlapping windows, e.g. Sessions.
+    final boolean merging = !windowingStrategy.getWindowFn().isNonMerging();
+
+    while (iterator.hasNext()) {
+      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
+      BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
+
+      boolean mergingAndIntersecting = merging
+          && isIntersecting((IntervalWindow) currentWindow, (IntervalWindow) nextWindow);
+
+      if (mergingAndIntersecting || nextWindow.equals(currentWindow)) {
+        if (mergingAndIntersecting) {
+          // merge intersecting windows.
+          currentWindow = merge((IntervalWindow) currentWindow, (IntervalWindow) nextWindow);
+        }
+        // keep accumulating and carry on ;-)
+        accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
+            ctxtForInput(nextValue));
+        windowTimestamp = outputTimeFn.combine(windowTimestamp,
+            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
+      } else {
+        // moving to the next window, first add the current accumulation to output
+        // and initialize the accumulator.
+        output.add(WindowedValue.of(KV.of(key, accumulator), windowTimestamp, currentWindow,
+            PaneInfo.NO_FIRING));
+        // re-init accumulator, window and timestamp.
+        accumulator = combineFn.createAccumulator(key, ctxtForInput(nextValue));
+        accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(),
+            ctxtForInput(nextValue));
+        currentWindow = nextWindow;
+        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+      }
+    }
+
+    // add last accumulator to the output.
+    output.add(WindowedValue.of(KV.of(key, accumulator), windowTimestamp, currentWindow,
+        PaneInfo.NO_FIRING));
+
+    return output;
+  }
+
+  /**
+   * Implements Spark's mergeValue function in:
+   * <p>
+   * {@link org.apache.spark.rdd.PairRDDFunctions#combineByKey}.
+   * </p>
+   */
+  Iterable<WindowedValue<KV<K, AccumT>>> mergeValue(WindowedValue<KV<K, InputT>> wkvi,
+                                                    Iterable<WindowedValue<KV<K, AccumT>>> wkvas) {
+    // by calling createCombiner on the inputs and afterwards merging the accumulators,we avoid
+    // an explode&accumulate for the input that will result in poor O(n^2) performance:
+    // first sort the exploded input - O(nlogn).
+    // follow with an accumulators sort = O(mlogm).
+    // now for each (exploded) input, find a matching accumulator (if exists) to merge into, or
+    // create a new one - O(n*m).
+    // this results in - O(nlogn) + O(mlogm) + O(n*m) ~> O(n^2)
+    // instead, calling createCombiner will create accumulators from the input - O(nlogn) + O(n).
+    // now, calling mergeCombiners will finally result in - O((n+m)log(n+m)) + O(n+m) ~> O(nlogn).
+    return mergeCombiners(createCombiner(wkvi), wkvas);
+  }
+
+  /**
+   * Implements Spark's mergeCombiners function in:
+   * <p>
+   * {@link org.apache.spark.rdd.PairRDDFunctions#combineByKey}.
+   * </p>
+   */
+  Iterable<WindowedValue<KV<K, AccumT>>> mergeCombiners(Iterable<WindowedValue<KV<K, AccumT>>> a1,
+                                                        Iterable<WindowedValue<KV<K, AccumT>>> a2) {
+    // concatenate accumulators.
+    Iterable<WindowedValue<KV<K, AccumT>>> accumulators = Iterables.concat(a1, a2);
+
+    // sort accumulators, no need to explode since inputs were exploded.
+    Iterable<WindowedValue<KV<K, AccumT>>> sortedAccumulators = sortByWindows(accumulators);
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
+
+    //--- accumulators iterator, by window order.
+    final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedAccumulators.iterator();
+
+    // get the first accumulator and assign it to the current window's accumulators.
+    WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
+    K key = currentValue.getValue().getKey();
+    BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
+    List<AccumT> currentWindowAccumulators = Lists.newArrayList();
+    currentWindowAccumulators.add(currentValue.getValue().getValue());
+
+    // keep track of the timestamps assigned by the OutputTimeFn,
+    // in createCombiner we already merge the timestamps assigned
+    // to individual elements, here we will just merge them.
+    List<Instant> windowTimestamps = Lists.newArrayList();
+    windowTimestamps.add(currentValue.getTimestamp());
+
+    // accumulate the next windows, or output.
+    List<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList();
+
+    // if merging, merge overlapping windows, e.g. Sessions.
+    final boolean merging = !windowingStrategy.getWindowFn().isNonMerging();
+
+    while (iterator.hasNext()) {
+      WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
+      BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
+
+      boolean mergingAndIntersecting = merging
+          && isIntersecting((IntervalWindow) currentWindow, (IntervalWindow) nextWindow);
+
+      if (mergingAndIntersecting || nextWindow.equals(currentWindow)) {
+        if (mergingAndIntersecting) {
+          // merge intersecting windows.
+          currentWindow = merge((IntervalWindow) currentWindow, (IntervalWindow) nextWindow);
+        }
+        // add to window accumulators.
+        currentWindowAccumulators.add(nextValue.getValue().getValue());
+        windowTimestamps.add(nextValue.getTimestamp());
+      } else {
+        // before moving to the next window,
+        // add the current accumulation to the output and initialize the accumulation.
+
+        // merge the timestamps of all accumulators to merge.
+        Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+
+        // merge accumulators.
+        // transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>.
+        // for the (possibly merged) window.
+        Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
+        WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of(
+            KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
+        // applying the actual combiner onto the accumulators.
+        AccumT accumulated = combineFn.mergeAccumulators(key, accumsToMerge,
+            ctxtForInput(preMergeWindowedValue));
+        WindowedValue<KV<K, AccumT>> postMergeWindowedValue =
+            preMergeWindowedValue.withValue(KV.of(key, accumulated));
+        // emit the accumulated output.
+        output.add(postMergeWindowedValue);
+
+        // re-init accumulator, window and timestamps.
+        currentWindowAccumulators.clear();
+        currentWindowAccumulators.add(nextValue.getValue().getValue());
+        currentWindow = nextWindow;
+        windowTimestamps.clear();
+        windowTimestamps.add(nextValue.getTimestamp());
+      }
+    }
+
+    // merge the last chunk of accumulators.
+    Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps);
+    Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
+    WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of(
+        KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING);
+    AccumT accumulated = combineFn.mergeAccumulators(key, accumsToMerge,
+        ctxtForInput(preMergeWindowedValue));
+    WindowedValue<KV<K, AccumT>> postMergeWindowedValue =
+        preMergeWindowedValue.withValue(KV.of(key, accumulated));
+    output.add(postMergeWindowedValue);
+
+    return output;
+  }
+
+  Iterable<WindowedValue<OutputT>> extractOutput(Iterable<WindowedValue<KV<K, AccumT>>> wkvas) {
+    return Iterables.transform(wkvas,
+        new Function<WindowedValue<KV<K, AccumT>>, WindowedValue<OutputT>>() {
+          @Nullable
+          @Override
+          public WindowedValue<OutputT> apply(@Nullable WindowedValue<KV<K, AccumT>> wkva) {
+            if (wkva == null) {
+              return null;
+            }
+            K key = wkva.getValue().getKey();
+            AccumT accumulator = wkva.getValue().getValue();
+            return wkva.withValue(combineFn.extractOutput(key, accumulator, ctxtForInput(wkva)));
+          }
+        });
+  }
+}