You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/09/21 17:25:06 UTC

[1/4] incubator-beam git commit: [BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 5c23f4954 -> 1ceb12aeb


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
new file mode 100644
index 0000000..beaae13
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearAggregatorsRule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule that clears the {@link org.apache.beam.runners.spark.aggregators.AccumulatorSingleton}
+ * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s.
+ */
+class ClearAggregatorsRule extends ExternalResource {
+  @Override
+  protected void before() throws Throwable {
+    AccumulatorSingleton.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 8b7762f..238d7ba 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -29,6 +29,7 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
@@ -53,6 +54,9 @@ public class SimpleWordCountTest {
   @Rule
   public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
 
+  @Rule
+  public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
+
   private static final String[] WORDS_ARRAY = {
       "hi there", "hi", "hi sue bob",
       "hi sue", "", "bob hi"};

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 0d15d12..f85baab 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -67,8 +67,7 @@ public class SideEffectsTest implements Serializable {
 
       // TODO: remove the version check (and the setup and teardown methods) when we no
       // longer support Spark 1.3 or 1.4
-      String version = SparkContextFactory.getSparkContext(options.getSparkMaster(),
-          options.getAppName()).version();
+      String version = SparkContextFactory.getSparkContext(options).version();
       if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
         assertTrue(e.getCause() instanceof UserException);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index a6fe755..8210b0d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -22,19 +22,21 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.joda.time.Duration;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /**
  * Test Flatten (union) implementation for streaming.
@@ -51,26 +53,50 @@ public class FlattenStreamingTest {
           Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_2));
   private static final String[] EXPECTED_UNION = {
           "one", "two", "three", "four", "five", "six", "seven", "eight"};
-  private static final long TEST_TIMEOUT_MSEC = 1000L;
+
+  @Rule
+  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
+
+  @Rule
+  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
 
   @Test
-  public void testRun() throws Exception {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    options.setStreaming(true);
-    // using the default 1000 msec interval
-    options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
+  public void testFlattenUnbounded() throws Exception {
+    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
+        checkpointParentDir.newFolder(getClass().getSimpleName()));
+
     Pipeline p = Pipeline.create(options);
+    PCollection<String> w1 =
+        p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedW1 =
+        w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+    PCollection<String> w2 =
+        p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedW2 =
+        w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+    PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
+    PCollection<String> union = list.apply(Flatten.<String>pCollections());
 
+    PAssertStreaming.assertContents(union, EXPECTED_UNION);
+
+    EvaluationResult res = (EvaluationResult) p.run();
+    res.close();
+  }
+
+  @Test
+  public void testFlattenBoundedUnbounded() throws Exception {
+    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
+        checkpointParentDir.newFolder(getClass().getSimpleName()));
+
+    Pipeline p = Pipeline.create(options);
     PCollection<String> w1 =
-            p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
+        p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
     PCollection<String> windowedW1 =
-            w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+        w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
     PCollection<String> w2 =
-            p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of());
+        p.apply(Create.of(WORDS_ARRAY_2)).setCoder(StringUtf8Coder.of());
     PCollection<String> windowedW2 =
-            w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
+        w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
     PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
     PCollection<String> union = list.apply(Flatten.<String>pCollections());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index ac77922..caf5d13 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -25,14 +25,13 @@ import java.util.Properties;
 import kafka.serializer.StringDecoder;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.io.KafkaIO;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -43,11 +42,13 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.spark.streaming.Durations;
 import org.joda.time.Duration;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
 /**
  * Test Kafka as input.
  */
@@ -61,7 +62,6 @@ public class KafkaStreamingTest {
       "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
   );
   private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};
-  private static final long TEST_TIMEOUT_MSEC = 1000L;
 
   @BeforeClass
   public static void init() throws IOException {
@@ -82,22 +82,22 @@ public class KafkaStreamingTest {
     }
   }
 
+  @Rule
+  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
+
+  @Rule
+  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+
   @Test
   public void testRun() throws Exception {
-    // test read from Kafka
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    options.setStreaming(true);
-    options.setBatchIntervalMillis(Durations.seconds(1).milliseconds());
-    options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
-    Pipeline p = Pipeline.create(options);
-
+    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
+        checkpointParentDir.newFolder(getClass().getSimpleName()));
     Map<String, String> kafkaParams = ImmutableMap.of(
         "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
         "auto.offset.reset", "smallest"
     );
 
+    Pipeline p = Pipeline.create(options);
     PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(StringDecoder.class,
         StringDecoder.class, String.class, String.class, Collections.singleton(TOPIC),
         kafkaParams))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/RecoverFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/RecoverFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/RecoverFromCheckpointStreamingTest.java
new file mode 100644
index 0000000..4a96690
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/RecoverFromCheckpointStreamingTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import kafka.serializer.StringDecoder;
+import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.apache.beam.runners.spark.io.KafkaIO;
+import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
+import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+/**
+ * Tests DStream recovery from checkpoint - recreate the job and continue (from checkpoint).
+ *
+ * Tests Aggregators, which rely on Accumulators - Aggregators should be available, though state
+ * is not preserved (Spark issue), so they start from initial value.
+ * //TODO: after the runner supports recovering the state of Aggregators, update this test's
+ * expected values for the recovered (second) run.
+ */
+public class RecoverFromCheckpointStreamingTest {
+  private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER =
+      new EmbeddedKafkaCluster.EmbeddedZookeeper();
+  private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
+      new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties());
+  private static final String TOPIC = "kafka_beam_test_topic";
+  private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
+      "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
+  );
+  private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};
+  private static final long EXPECTED_AGG_FIRST = 4L;
+
+  @Rule
+  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
+
+  @Rule
+  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+
+  @BeforeClass
+  public static void init() throws IOException {
+    EMBEDDED_ZOOKEEPER.startup();
+    EMBEDDED_KAFKA_CLUSTER.startup();
+    /// this test actually requires to NOT reuse the context but rather to stop it and start again
+    // from the checkpoint with a brand new context.
+    System.setProperty("beam.spark.test.reuseSparkContext", "false");
+    // write to Kafka
+    Properties producerProps = new Properties();
+    producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
+    producerProps.put("request.required.acks", 1);
+    producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
+    Serializer<String> stringSerializer = new StringSerializer();
+    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer =
+        new KafkaProducer(producerProps, stringSerializer, stringSerializer)) {
+      for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) {
+        kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue()));
+      }
+      kafkaProducer.close();
+    }
+  }
+
+  @Test
+  public void testRun() throws Exception {
+    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
+        checkpointParentDir.newFolder(getClass().getSimpleName()));
+
+    // checkpoint after first (and only) interval.
+    options.setCheckpointDurationMillis(options.getBatchIntervalMillis());
+
+    // first run will read from Kafka backlog - "auto.offset.reset=smallest"
+    EvaluationResult res = run(options);
+    res.close();
+    long processedMessages1 = res.getAggregatorValue("processedMessages", Long.class);
+    assertThat(String.format("Expected %d processed messages count but "
+        + "found %d", EXPECTED_AGG_FIRST, processedMessages1), processedMessages1,
+            equalTo(EXPECTED_AGG_FIRST));
+
+    // recovery should resume from last read offset, so nothing is read here.
+    res = runAgain(options);
+    res.close();
+    long processedMessages2 = res.getAggregatorValue("processedMessages", Long.class);
+    assertThat(String.format("Expected %d processed messages count but "
+        + "found %d", 0, processedMessages2), processedMessages2, equalTo(0L));
+  }
+
+  private static EvaluationResult runAgain(SparkPipelineOptions options) {
+    AccumulatorSingleton.clear();
+    // sleep before next run.
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+    return run(options);
+  }
+
+  private static EvaluationResult run(SparkPipelineOptions options) {
+    Map<String, String> kafkaParams = ImmutableMap.of(
+            "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
+            "auto.offset.reset", "smallest"
+    );
+    Pipeline p = Pipeline.create(options);
+    PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(
+        StringDecoder.class, StringDecoder.class, String.class, String.class,
+            Collections.singleton(TOPIC), kafkaParams)).setCoder(KvCoder.of(StringUtf8Coder.of(),
+                StringUtf8Coder.of()));
+    PCollection<KV<String, String>> windowedWords = kafkaInput
+        .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(1))));
+    PCollection<String> formattedKV = windowedWords.apply(ParDo.of(
+        new FormatAsText()));
+
+    PAssertStreaming.assertContents(formattedKV, EXPECTED);
+
+    return  (EvaluationResult) p.run();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EMBEDDED_KAFKA_CLUSTER.shutdown();
+    EMBEDDED_ZOOKEEPER.shutdown();
+  }
+
+  private static class FormatAsText extends DoFn<KV<String, String>, String> {
+
+    private final Aggregator<Long, Long> aggregator =
+        createAggregator("processedMessages", new Sum.SumLongFn());
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      aggregator.addValue(1L);
+      String formatted = c.element().getKey() + "," + c.element().getValue();
+      c.output(formatted);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 671d227..1464273 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -24,20 +24,21 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.spark.streaming.Durations;
 import org.joda.time.Duration;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
 
 /**
  * Simple word count streaming test.
@@ -49,23 +50,23 @@ public class SimpleStreamingWordCountTest implements Serializable {
   private static final List<Iterable<String>> WORDS_QUEUE =
       Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY));
   private static final String[] EXPECTED_COUNTS = {"hi: 5", "there: 1", "sue: 2", "bob: 2"};
-  private static final long TEST_TIMEOUT_MSEC = 1000L;
+
+  @Rule
+  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
+
+  @Rule
+  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
 
   @Test
   public void testRun() throws Exception {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    options.setStreaming(true);
-    options.setBatchIntervalMillis(Durations.seconds(1).milliseconds());
-    options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
-    Pipeline p = Pipeline.create(options);
+    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
+        checkpointParentDir.newFolder(getClass().getSimpleName()));
 
+    Pipeline p = Pipeline.create(options);
     PCollection<String> inputWords =
         p.apply(CreateStream.fromQueue(WORDS_QUEUE)).setCoder(StringUtf8Coder.of());
     PCollection<String> windowedWords = inputWords
         .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
-
     PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
new file mode 100644
index 0000000..d695df0
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation.streaming.utils;
+
+
+import java.io.File;
+import java.net.MalformedURLException;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.rules.ExternalResource;
+
+
+/**
+ * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines.
+ */
+public class TestOptionsForStreaming extends ExternalResource {
+  private final SparkPipelineOptions options =
+      PipelineOptionsFactory.as(SparkPipelineOptions.class);
+
+  @Override
+  protected void before() throws Throwable {
+    options.setRunner(SparkRunner.class);
+    options.setStreaming(true);
+    options.setTimeout(1000L);
+  }
+
+  public SparkPipelineOptions withTmpCheckpointDir(File checkpointDir)
+      throws MalformedURLException {
+    // tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
+    // so need to add the missing protocol.
+    options.setCheckpointDir(checkpointDir.toURI().toURL().toString());
+    return options;
+  }
+
+  public SparkPipelineOptions getOptions() {
+    return options;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 2b89372..a00dcba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1410,6 +1410,13 @@ public class Combine {
           ImmutableList.copyOf(sideInputs));
     }
 
+    /**
+     * Returns the {@link GlobalCombineFn} used by this Combine operation.
+     */
+    public GlobalCombineFn<? super InputT, ?, OutputT> getFn() {
+      return fn;
+    }
+
     @Override
     public PCollection<OutputT> apply(PCollection<InputT> input) {
       PCollection<KV<Void, InputT>> withKeys = input


[4/4] incubator-beam git commit: This closes #909

Posted by am...@apache.org.
This closes #909


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1ceb12ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1ceb12ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1ceb12ae

Branch: refs/heads/master
Commit: 1ceb12aebd0ffa63bd28d31cbe830230713705ec
Parents: 5c23f49 0feb649
Author: Sela <an...@paypal.com>
Authored: Wed Sep 21 20:17:38 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Sep 21 20:17:38 2016 +0300

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |  28 +-
 .../apache/beam/runners/spark/SparkRunner.java  | 121 ++--
 .../spark/aggregators/AccumulatorSingleton.java |  53 ++
 .../runners/spark/translation/DoFnFunction.java |  35 +-
 .../spark/translation/EvaluationContext.java    |  17 +-
 .../translation/GroupCombineFunctions.java      | 262 +++++++++
 .../spark/translation/MultiDoFnFunction.java    |  44 +-
 .../spark/translation/SparkContextFactory.java  |  48 +-
 .../translation/SparkPipelineEvaluator.java     |  57 --
 .../translation/SparkPipelineTranslator.java    |   5 +-
 .../spark/translation/SparkProcessContext.java  |  10 +-
 .../spark/translation/SparkRuntimeContext.java  |  44 +-
 .../spark/translation/TransformTranslator.java  | 473 +++-------------
 .../spark/translation/TranslationUtils.java     | 195 +++++++
 .../SparkRunnerStreamingContextFactory.java     |  98 ++++
 .../streaming/StreamingEvaluationContext.java   |  44 +-
 .../streaming/StreamingTransformTranslator.java | 549 ++++++++++++-------
 .../runners/spark/util/BroadcastHelper.java     |  26 +
 .../runners/spark/ClearAggregatorsRule.java     |  33 ++
 .../beam/runners/spark/SimpleWordCountTest.java |   4 +
 .../spark/translation/SideEffectsTest.java      |   3 +-
 .../streaming/FlattenStreamingTest.java         |  54 +-
 .../streaming/KafkaStreamingTest.java           |  26 +-
 .../RecoverFromCheckpointStreamingTest.java     | 179 ++++++
 .../streaming/SimpleStreamingWordCountTest.java |  25 +-
 .../utils/TestOptionsForStreaming.java          |  55 ++
 .../org/apache/beam/sdk/transforms/Combine.java |   7 +
 27 files changed, 1682 insertions(+), 813 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: [BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming.

Posted by am...@apache.org.
[BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming.

Refactor translation mechanism to support checkpointing of DStream.

Support basic functionality with GroupByKey and ParDo.

Added support for grouping operations.

Added checkpointDir option, using it before execution.

Support Accumulator recovery from checkpoint.

Streaming tests should use JUnit's TemporaryFolder Rule for checkpoint directory.

Support combine optimizations.

Support durable sideInput via Broadcast.

Branches in the pipeline are either Bounded or Unbounded and should be handles so.

Handle flatten/union of Bouned/Unbounded RDD/DStream.

JavaDoc

Rebased on master.

Reuse functionality between batch and streaming translators

Better implementation of streaming/batch pipeline-branch translation.

Move group/combine functions to their own wrapping class.

Fixed missing licenses.

Use VisibleForTesting annotation instead of comment.

Remove Broadcast failure recovery, to be handled separately.

Stop streaming gracefully, so any checkpointing will finish first.

typo + better documentation.

Validate checkpointDir durability.

Add checkpoint duration option.

A more compact streaming tests init with Rules.

A more accurate test, removed broadcast from test as it will be handeled separately.

Bounded/Unbounded translation to be handled by the SparkPipelineTranslator implementation. Evaluator
decides if translateBounded or translateUnbounded according to the visited node's boundeness.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0feb6499
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0feb6499
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0feb6499

Branch: refs/heads/master
Commit: 0feb64994a05de4fe6b1ba178a38d03743b89b7a
Parents: 5c23f49
Author: Sela <an...@paypal.com>
Authored: Thu Aug 25 23:49:01 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Sep 21 20:15:27 2016 +0300

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |  28 +-
 .../apache/beam/runners/spark/SparkRunner.java  | 121 ++--
 .../spark/aggregators/AccumulatorSingleton.java |  53 ++
 .../runners/spark/translation/DoFnFunction.java |  35 +-
 .../spark/translation/EvaluationContext.java    |  17 +-
 .../translation/GroupCombineFunctions.java      | 262 +++++++++
 .../spark/translation/MultiDoFnFunction.java    |  44 +-
 .../spark/translation/SparkContextFactory.java  |  48 +-
 .../translation/SparkPipelineEvaluator.java     |  57 --
 .../translation/SparkPipelineTranslator.java    |   5 +-
 .../spark/translation/SparkProcessContext.java  |  10 +-
 .../spark/translation/SparkRuntimeContext.java  |  44 +-
 .../spark/translation/TransformTranslator.java  | 473 +++-------------
 .../spark/translation/TranslationUtils.java     | 195 +++++++
 .../SparkRunnerStreamingContextFactory.java     |  98 ++++
 .../streaming/StreamingEvaluationContext.java   |  44 +-
 .../streaming/StreamingTransformTranslator.java | 549 ++++++++++++-------
 .../runners/spark/util/BroadcastHelper.java     |  26 +
 .../runners/spark/ClearAggregatorsRule.java     |  33 ++
 .../beam/runners/spark/SimpleWordCountTest.java |   4 +
 .../spark/translation/SideEffectsTest.java      |   3 +-
 .../streaming/FlattenStreamingTest.java         |  54 +-
 .../streaming/KafkaStreamingTest.java           |  26 +-
 .../RecoverFromCheckpointStreamingTest.java     | 179 ++++++
 .../streaming/SimpleStreamingWordCountTest.java |  25 +-
 .../utils/TestOptionsForStreaming.java          |  55 ++
 .../org/apache/beam/sdk/transforms/Combine.java |   7 +
 27 files changed, 1682 insertions(+), 813 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index db6b75c..7afb68c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -19,9 +19,9 @@
 package org.apache.beam.runners.spark;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
-
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
@@ -48,6 +48,32 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
   Long getBatchIntervalMillis();
   void setBatchIntervalMillis(Long batchInterval);
 
+  @Description("A checkpoint directory for streaming resilience, ignored in batch. "
+      + "For durability, a reliable filesystem such as HDFS/S3/GS is necessary.")
+  @Default.InstanceFactory(TmpCheckpointDirFactory.class)
+  String getCheckpointDir();
+  void setCheckpointDir(String checkpointDir);
+
+  /**
+   * Returns the default checkpoint directory of /tmp/${job.name}.
+   * For testing purposes only. Production applications should use a reliable
+   * filesystem such as HDFS/S3/GS.
+   */
+  static class TmpCheckpointDirFactory implements DefaultValueFactory<String> {
+    @Override
+    public String create(PipelineOptions options) {
+      SparkPipelineOptions sparkPipelineOptions = options.as(SparkPipelineOptions.class);
+      return "file:///tmp/" + sparkPipelineOptions.getJobName();
+    }
+  }
+
+  @Description("The period to checkpoint (in Millis). If not set, Spark will default "
+      + "to Max(slideDuration, Seconds(10)). This PipelineOptions default (-1) will end-up "
+          + "with the described Spark default.")
+  @Default.Long(-1)
+  Long getCheckpointDurationMillis();
+  void setCheckpointDurationMillis(Long durationMillis);
+
   @Description("Enable/disable sending aggregator values to Spark's metric sinks")
   @Default.Boolean(true)
   Boolean getEnableSparkSinks();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 03db811..63dfe0d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -18,15 +18,16 @@
 
 package org.apache.beam.runners.spark;
 
+import java.util.Collection;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.SparkProcessContext;
+import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
+import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
 import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
-import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
 import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -34,15 +35,17 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -143,40 +146,27 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
   public EvaluationResult run(Pipeline pipeline) {
     try {
       LOG.info("Executing pipeline using the SparkRunner.");
-      JavaSparkContext jsc;
-      if (mOptions.getUsesProvidedSparkContext()) {
-        LOG.info("Using a provided Spark Context");
-        jsc = mOptions.getProvidedSparkContext();
-        if (jsc == null || jsc.sc().isStopped()){
-          LOG.error("The provided Spark context "
-                  + jsc + " was not created or was stopped");
-          throw new RuntimeException("The provided Spark context was not created or was stopped");
-        }
-      } else {
-        LOG.info("Creating a new Spark Context");
-        jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(), mOptions.getAppName());
-      }
-      if (mOptions.isStreaming()) {
-        SparkPipelineTranslator translator =
-            new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
-        Duration batchInterval = new Duration(mOptions.getBatchIntervalMillis());
-        LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds());
 
-        EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
-        pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
-        ctxt.computeOutputs();
+      if (mOptions.isStreaming()) {
+        SparkRunnerStreamingContextFactory contextFactory =
+            new SparkRunnerStreamingContextFactory(pipeline, mOptions);
+        JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(),
+            contextFactory);
 
-        LOG.info("Streaming pipeline construction complete. Starting execution..");
-        ((StreamingEvaluationContext) ctxt).getStreamingContext().start();
+        LOG.info("Starting streaming pipeline execution.");
+        jssc.start();
 
-        return ctxt;
+        // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance.
+        return contextFactory.getCtxt() == null ? new StreamingEvaluationContext(jssc.sc(),
+            pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt();
       } else {
         if (mOptions.getTimeout() > 0) {
           LOG.info("Timeout is ignored by the SparkRunner in batch.");
         }
+        JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
         EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
         SparkPipelineTranslator translator = new TransformTranslator.Translator();
-        pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
+        pipeline.traverseTopologically(new Evaluator(translator, ctxt));
         ctxt.computeOutputs();
 
         LOG.info("Pipeline execution complete.");
@@ -202,23 +192,18 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
     }
   }
 
-  private EvaluationContext
-      createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
-      Duration batchDuration) {
-    JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
-    return new StreamingEvaluationContext(jsc, pipeline, jssc, mOptions.getTimeout());
-  }
-
   /**
    * Evaluator on the pipeline.
    */
-  public abstract static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
-    protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
+  public static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
+    private static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
 
-    protected final SparkPipelineTranslator translator;
+    private final EvaluationContext ctxt;
+    private final SparkPipelineTranslator translator;
 
-    protected Evaluator(SparkPipelineTranslator translator) {
+    public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) {
       this.translator = translator;
+      this.ctxt = ctxt;
     }
 
     @Override
@@ -242,8 +227,62 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
       doVisitTransform(node);
     }
 
-    protected abstract <TransformT extends PTransform<? super PInput, POutput>> void
-        doVisitTransform(TransformTreeNode node);
+    <TransformT extends PTransform<? super PInput, POutput>> void
+        doVisitTransform(TransformTreeNode node) {
+      @SuppressWarnings("unchecked")
+      TransformT transform = (TransformT) node.getTransform();
+      @SuppressWarnings("unchecked")
+      Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
+      @SuppressWarnings("unchecked") TransformEvaluator<TransformT> evaluator =
+          translate(node, transform, transformClass);
+      LOG.info("Evaluating {}", transform);
+      AppliedPTransform<PInput, POutput, TransformT> appliedTransform =
+          AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform);
+      ctxt.setCurrentTransform(appliedTransform);
+      evaluator.evaluate(transform, ctxt);
+      ctxt.setCurrentTransform(null);
+    }
+
+    /**
+     *  Determine if this Node belongs to a Bounded branch of the pipeline, or Unbounded, and
+     *  translate with the proper translator.
+     */
+    private <TransformT extends PTransform<? super PInput, POutput>> TransformEvaluator<TransformT>
+        translate(TransformTreeNode node, TransformT transform, Class<TransformT> transformClass) {
+      //--- determine if node is bounded/unbounded.
+      // usually, the input determines if the PCollection to apply the next transformation to
+      // is BOUNDED or UNBOUNDED, meaning RDD/DStream.
+      Collection<? extends PValue> pValues;
+      PInput pInput = node.getInput();
+      if (pInput instanceof PBegin) {
+        // in case of a PBegin, it's the output.
+        pValues = node.getOutput().expand();
+      } else {
+        pValues = pInput.expand();
+      }
+      PCollection.IsBounded isNodeBounded = isBoundedCollection(pValues);
+      // translate accordingly.
+      LOG.debug("Translating {} as {}", transform, isNodeBounded);
+      return isNodeBounded.equals(PCollection.IsBounded.BOUNDED)
+          ? translator.translateBounded(transformClass)
+              : translator.translateUnbounded(transformClass);
+    }
+
+    private PCollection.IsBounded isBoundedCollection(Collection<? extends PValue> pValues) {
+      // anything that is not a PCollection, is BOUNDED.
+      // For PCollections:
+      // BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED
+      // while BOUNDED + UNBOUNDED = UNBOUNDED.
+      PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED;
+      for (PValue pValue: pValues) {
+        if (pValue instanceof PCollection) {
+          isBounded = isBounded.and(((PCollection) pValue).isBounded());
+        } else {
+          isBounded = isBounded.and(PCollection.IsBounded.BOUNDED);
+        }
+      }
+      return isBounded;
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
new file mode 100644
index 0000000..758372e
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AccumulatorSingleton.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * For resilience, {@link Accumulator}s are required to be wrapped in a Singleton.
+ * @see <a href="https://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a>
+ */
+public class AccumulatorSingleton {
+
+  private static volatile Accumulator<NamedAggregators> instance = null;
+
+  public static Accumulator<NamedAggregators> getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (AccumulatorSingleton.class) {
+        if (instance == null) {
+          //TODO: currently when recovering from checkpoint, Spark does not recover the
+          // last known Accumulator value. The SparkRunner should be able to persist and recover
+          // the NamedAggregators in order to recover Aggregators as well.
+          instance = jsc.sc().accumulator(new NamedAggregators(), new AggAccumParam());
+        }
+      }
+    }
+    return instance;
+  }
+
+  @VisibleForTesting
+  public static void clear() {
+    synchronized (AccumulatorSingleton.class) {
+      instance = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 454b760..79639a2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -22,10 +22,12 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,8 +39,8 @@ import org.slf4j.LoggerFactory;
  * @param <OutputT> Output element type.
  */
 public class DoFnFunction<InputT, OutputT>
-    implements FlatMapFunction<Iterator<WindowedValue<InputT>>,
-    WindowedValue<OutputT>> {
+    implements FlatMapFunction<Iterator<WindowedValue<InputT>>, WindowedValue<OutputT>> {
+  private final Accumulator<NamedAggregators> accum;
   private final OldDoFn<InputT, OutputT> mFunction;
   private static final Logger LOG = LoggerFactory.getLogger(DoFnFunction.class);
 
@@ -46,18 +48,32 @@ public class DoFnFunction<InputT, OutputT>
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
 
   /**
+   * @param accum      The Spark Accumulator that handles the Beam Aggregators.
    * @param fn         DoFunction to be wrapped.
    * @param runtime    Runtime to apply function in.
    * @param sideInputs Side inputs used in DoFunction.
    */
-  public DoFnFunction(OldDoFn<InputT, OutputT> fn,
-               SparkRuntimeContext runtime,
-               Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
+  public DoFnFunction(Accumulator<NamedAggregators> accum,
+                      OldDoFn<InputT, OutputT> fn,
+                      SparkRuntimeContext runtime,
+                      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
+    this.accum = accum;
     this.mFunction = fn;
     this.mRuntimeContext = runtime;
     this.mSideInputs = sideInputs;
   }
 
+  /**
+   * @param fn         DoFunction to be wrapped.
+   * @param runtime    Runtime to apply function in.
+   * @param sideInputs Side inputs used in DoFunction.
+   */
+  public DoFnFunction(OldDoFn<InputT, OutputT> fn,
+                      SparkRuntimeContext runtime,
+                      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
+    this(null, fn, runtime, sideInputs);
+  }
+
   @Override
   public Iterable<WindowedValue<OutputT>> call(Iterator<WindowedValue<InputT>> iter) throws
       Exception {
@@ -103,6 +119,15 @@ public class DoFnFunction<InputT, OutputT>
     }
 
     @Override
+    public Accumulator<NamedAggregators> getAccumulator() {
+      if (accum == null) {
+        throw new UnsupportedOperationException("SparkRunner does not provide Aggregator support "
+             + "for DoFnFunction of type: " + mFunction.getClass().getCanonicalName());
+      }
+      return accum;
+    }
+
+    @Override
     protected void clearOutput() {
       outputs.clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 724f54f..2397276 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
@@ -68,7 +69,7 @@ public class EvaluationContext implements EvaluationResult {
   public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
     this.jsc = jsc;
     this.pipeline = pipeline;
-    this.runtime = new SparkRuntimeContext(jsc, pipeline);
+    this.runtime = new SparkRuntimeContext(pipeline, jsc);
   }
 
   /**
@@ -136,7 +137,7 @@ public class EvaluationContext implements EvaluationResult {
     return jsc;
   }
 
-  protected Pipeline getPipeline() {
+  public Pipeline getPipeline() {
     return pipeline;
   }
 
@@ -144,7 +145,7 @@ public class EvaluationContext implements EvaluationResult {
     return runtime;
   }
 
-  protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
     this.currentTransform = transform;
   }
 
@@ -178,7 +179,7 @@ public class EvaluationContext implements EvaluationResult {
     pcollections.put((PValue) getOutput(transform), new RDDHolder<>(values, coder));
   }
 
-  void setPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
+  public void setPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
     pview.put(view, value);
   }
 
@@ -187,7 +188,7 @@ public class EvaluationContext implements EvaluationResult {
     return pcollections.containsKey(pvalue);
   }
 
-  protected JavaRDDLike<?, ?> getRDD(PValue pvalue) {
+  public JavaRDDLike<?, ?> getRDD(PValue pvalue) {
     RDDHolder<?> rddHolder = pcollections.get(pvalue);
     JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
     leafRdds.remove(rddHolder);
@@ -211,7 +212,7 @@ public class EvaluationContext implements EvaluationResult {
     leafRdds.add(rddHolder);
   }
 
-  JavaRDDLike<?, ?> getInputRDD(PTransform<? extends PInput, ?> transform) {
+  protected JavaRDDLike<?, ?> getInputRDD(PTransform<? extends PInput, ?> transform) {
     return getRDD((PValue) getInput(transform));
   }
 
@@ -252,13 +253,13 @@ public class EvaluationContext implements EvaluationResult {
 
   @Override
   public <T> T getAggregatorValue(String named, Class<T> resultType) {
-    return runtime.getAggregatorValue(named, resultType);
+    return runtime.getAggregatorValue(AccumulatorSingleton.getInstance(jsc), named, resultType);
   }
 
   @Override
   public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
       throws AggregatorRetrievalException {
-    return runtime.getAggregatorValues(aggregator);
+    return runtime.getAggregatorValues(AccumulatorSingleton.getInstance(jsc), aggregator);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
new file mode 100644
index 0000000..eb4002e
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.util.ByteArray;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import scala.Tuple2;
+
+/**
+ * A set of group/combine functions to apply to Spark {@link org.apache.spark.rdd.RDD}s.
+ */
+public class GroupCombineFunctions {
+
+  /***
+   * Apply {@link GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly} to a Spark RDD.
+   */
+  public static <K, V> JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupByKeyOnly(
+      JavaRDD<WindowedValue<KV<K, V>>> rdd, KvCoder<K, V> coder) {
+    final Coder<K> keyCoder = coder.getKeyCoder();
+    final Coder<V> valueCoder = coder.getValueCoder();
+    // Use coders to convert objects in the PCollection to byte arrays, so they
+    // can be transferred over the network for the shuffle.
+    return rdd.map(WindowingHelpers.<KV<K, V>>unwindowFunction())
+        .mapToPair(TranslationUtils.<K, V>toPairFunction())
+        .mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder))
+        .groupByKey()
+        .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder))
+        // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK
+        .map(TranslationUtils.<K, Iterable<V>>fromPairFunction())
+        .map(WindowingHelpers.<KV<K, Iterable<V>>>windowFunction());
+  }
+
+  /***
+   * Apply {@link GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow} to a Spark RDD.
+   */
+  public static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K, Iterable<V>>>>
+  groupAlsoByWindow(JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> rdd,
+                    GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow<K, V> transform,
+                    SparkRuntimeContext runtimeContext,
+                    Accumulator<NamedAggregators> accum,
+                    KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder) {
+    //--- coders.
+    Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
+    IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+        (IterableCoder<WindowedValue<V>>) inputValueCoder;
+    Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
+    WindowedValue.WindowedValueCoder<V> inputIterableWindowedValueCoder =
+        (WindowedValue.WindowedValueCoder<V>) inputIterableElementCoder;
+    Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<?, W> windowingStrategy =
+        (WindowingStrategy<?, W>) transform.getWindowingStrategy();
+
+    // GroupAlsoByWindow current uses a dummy in-memory StateInternals
+    OldDoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
+        new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+            windowingStrategy, new TranslationUtils.InMemoryStateInternalsFactory<K>(),
+                SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+    return rdd.mapPartitions(new DoFnFunction<>(accum, gabwDoFn, runtimeContext, null));
+  }
+
+  /**
+   * Apply a composite {@link org.apache.beam.sdk.transforms.Combine.Globally} transformation.
+   */
+  public static <InputT, AccumT, OutputT> OutputT
+  combineGlobally(JavaRDD<WindowedValue<InputT>> rdd,
+                  final Combine.CombineFn<InputT, AccumT, OutputT> globally,
+                  final Coder<InputT> iCoder,
+                  final Coder<AccumT> aCoder) {
+    // Use coders to convert objects in the PCollection to byte arrays, so they
+    // can be transferred over the network for the shuffle.
+    JavaRDD<byte[]> inRddBytes = rdd.map(WindowingHelpers.<InputT>unwindowFunction()).map(
+        CoderHelpers.toByteFunction(iCoder));
+    /*AccumT*/ byte[] acc = inRddBytes.aggregate(
+        CoderHelpers.toByteArray(globally.createAccumulator(), aCoder),
+        new Function2</*AccumT*/ byte[], /*InputT*/ byte[], /*AccumT*/ byte[]>() {
+          @Override
+          public /*AccumT*/ byte[] call(/*AccumT*/ byte[] ab, /*InputT*/ byte[] ib)
+              throws Exception {
+            AccumT a = CoderHelpers.fromByteArray(ab, aCoder);
+            InputT i = CoderHelpers.fromByteArray(ib, iCoder);
+            return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder);
+          }
+        },
+        new Function2</*AccumT*/ byte[], /*AccumT*/ byte[], /*AccumT*/ byte[]>() {
+          @Override
+          public /*AccumT*/ byte[] call(/*AccumT*/ byte[] a1b, /*AccumT*/ byte[] a2b)
+              throws Exception {
+            AccumT a1 = CoderHelpers.fromByteArray(a1b, aCoder);
+            AccumT a2 = CoderHelpers.fromByteArray(a2b, aCoder);
+            // don't use Guava's ImmutableList.of as values may be null
+            List<AccumT> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
+            AccumT merged = globally.mergeAccumulators(accumulators);
+            return CoderHelpers.toByteArray(merged, aCoder);
+          }
+        }
+    );
+    return globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
+  }
+
+  /**
+   * Apply a composite {@link org.apache.beam.sdk.transforms.Combine.PerKey} transformation.
+   */
+  public static <K, InputT, AccumT, OutputT> JavaRDD<WindowedValue<KV<K, OutputT>>>
+  combinePerKey(JavaRDD<WindowedValue<KV<K, InputT>>> rdd,
+                final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> keyed,
+                final WindowedValue.FullWindowedValueCoder<K> wkCoder,
+                final WindowedValue.FullWindowedValueCoder<KV<K, InputT>> wkviCoder,
+                final WindowedValue.FullWindowedValueCoder<KV<K, AccumT>> wkvaCoder) {
+    // We need to duplicate K as both the key of the JavaPairRDD as well as inside the value,
+    // since the functions passed to combineByKey don't receive the associated key of each
+    // value, and we need to map back into methods in Combine.KeyedCombineFn, which each
+    // require the key in addition to the InputT's and AccumT's being merged/accumulated.
+    // Once Spark provides a way to include keys in the arguments of combine/merge functions,
+    // we won't need to duplicate the keys anymore.
+    // Key has to bw windowed in order to group by window as well
+    JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
+        rdd.flatMapToPair(
+            new PairFlatMapFunction<WindowedValue<KV<K, InputT>>, WindowedValue<K>,
+                WindowedValue<KV<K, InputT>>>() {
+              @Override
+              public Iterable<Tuple2<WindowedValue<K>, WindowedValue<KV<K, InputT>>>>
+              call(WindowedValue<KV<K, InputT>> kv) {
+                  List<Tuple2<WindowedValue<K>,
+                      WindowedValue<KV<K, InputT>>>> tuple2s =
+                      Lists.newArrayListWithCapacity(kv.getWindows().size());
+                  for (BoundedWindow boundedWindow: kv.getWindows()) {
+                    WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
+                        boundedWindow.maxTimestamp(), boundedWindow, kv.getPane());
+                    tuple2s.add(new Tuple2<>(wk, kv));
+                  }
+                return tuple2s;
+              }
+            });
+    // Use coders to convert objects in the PCollection to byte arrays, so they
+    // can be transferred over the network for the shuffle.
+    JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair
+        .mapToPair(CoderHelpers.toByteFunction(wkCoder, wkviCoder));
+
+    // The output of combineByKey will be "AccumT" (accumulator)
+    // types rather than "OutputT" (final output types) since Combine.CombineFn
+    // only provides ways to merge VAs, and no way to merge VOs.
+    JavaPairRDD</*K*/ ByteArray, /*KV<K, AccumT>*/ byte[]> accumulatedBytes =
+        inRddDuplicatedKeyPairBytes.combineByKey(
+        new Function</*KV<K, InputT>*/ byte[], /*KV<K, AccumT>*/ byte[]>() {
+          @Override
+          public /*KV<K, AccumT>*/ byte[] call(/*KV<K, InputT>*/ byte[] input) {
+            WindowedValue<KV<K, InputT>> wkvi =
+                CoderHelpers.fromByteArray(input, wkviCoder);
+            AccumT va = keyed.createAccumulator(wkvi.getValue().getKey());
+            va = keyed.addInput(wkvi.getValue().getKey(), va, wkvi.getValue().getValue());
+            WindowedValue<KV<K, AccumT>> wkva =
+                WindowedValue.of(KV.of(wkvi.getValue().getKey(), va), wkvi.getTimestamp(),
+                wkvi.getWindows(), wkvi.getPane());
+            return CoderHelpers.toByteArray(wkva, wkvaCoder);
+          }
+        },
+        new Function2</*KV<K, AccumT>*/ byte[],
+            /*KV<K, InputT>*/ byte[],
+            /*KV<K, AccumT>*/ byte[]>() {
+          @Override
+          public /*KV<K, AccumT>*/ byte[] call(/*KV<K, AccumT>*/ byte[] acc,
+              /*KV<K, InputT>*/ byte[] input) {
+            WindowedValue<KV<K, AccumT>> wkva =
+                CoderHelpers.fromByteArray(acc, wkvaCoder);
+            WindowedValue<KV<K, InputT>> wkvi =
+                CoderHelpers.fromByteArray(input, wkviCoder);
+            AccumT va =
+                keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(),
+                wkvi.getValue().getValue());
+            wkva = WindowedValue.of(KV.of(wkva.getValue().getKey(), va), wkva.getTimestamp(),
+                wkva.getWindows(), wkva.getPane());
+            return CoderHelpers.toByteArray(wkva, wkvaCoder);
+          }
+        },
+        new Function2</*KV<K, AccumT>*/ byte[],
+            /*KV<K, AccumT>*/ byte[],
+            /*KV<K, AccumT>*/ byte[]>() {
+          @Override
+          public /*KV<K, AccumT>*/ byte[] call(/*KV<K, AccumT>*/ byte[] acc1,
+              /*KV<K, AccumT>*/ byte[] acc2) {
+            WindowedValue<KV<K, AccumT>> wkva1 =
+                CoderHelpers.fromByteArray(acc1, wkvaCoder);
+            WindowedValue<KV<K, AccumT>> wkva2 =
+                CoderHelpers.fromByteArray(acc2, wkvaCoder);
+            AccumT va = keyed.mergeAccumulators(wkva1.getValue().getKey(),
+                // don't use Guava's ImmutableList.of as values may be null
+                Collections.unmodifiableList(Arrays.asList(wkva1.getValue().getValue(),
+                wkva2.getValue().getValue())));
+            WindowedValue<KV<K, AccumT>> wkva =
+                WindowedValue.of(KV.of(wkva1.getValue().getKey(),
+                va), wkva1.getTimestamp(), wkva1.getWindows(), wkva1.getPane());
+            return CoderHelpers.toByteArray(wkva, wkvaCoder);
+          }
+        });
+
+    JavaPairRDD<WindowedValue<K>, WindowedValue<OutputT>> extracted = accumulatedBytes
+        .mapToPair(CoderHelpers.fromByteFunction(wkCoder, wkvaCoder))
+        .mapValues(new Function<WindowedValue<KV<K, AccumT>>, WindowedValue<OutputT>>() {
+              @Override
+              public WindowedValue<OutputT> call(WindowedValue<KV<K, AccumT>> acc) {
+                return WindowedValue.of(keyed.extractOutput(acc.getValue().getKey(),
+                    acc.getValue().getValue()), acc.getTimestamp(), acc.getWindows(),
+                        acc.getPane());
+              }
+            });
+    return extracted.map(TranslationUtils.<WindowedValue<K>,
+        WindowedValue<OutputT>>fromPairFunction()).map(
+            new Function<KV<WindowedValue<K>, WindowedValue<OutputT>>,
+                WindowedValue<KV<K, OutputT>>>() {
+              @Override
+              public WindowedValue<KV<K, OutputT>> call(KV<WindowedValue<K>,
+                  WindowedValue<OutputT>> kwvo) throws Exception {
+                WindowedValue<OutputT> wvo = kwvo.getValue();
+                KV<K, OutputT> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue());
+                return WindowedValue.of(kvo, wvo.getTimestamp(), wvo.getWindows(), wvo.getPane());
+              }
+            });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 4c44ffd..163cf13 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -24,12 +24,15 @@ import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.joda.time.Instant;
+
 import scala.Tuple2;
 
 /**
@@ -39,24 +42,50 @@ import scala.Tuple2;
  * @param <InputT> Input type for DoFunction.
  * @param <OutputT> Output type for DoFunction.
  */
-class MultiDoFnFunction<InputT, OutputT>
-    implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> {
+public class MultiDoFnFunction<InputT, OutputT>
+    implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>,
+        WindowedValue<?>> {
+  private final Accumulator<NamedAggregators> accum;
   private final OldDoFn<InputT, OutputT> mFunction;
   private final SparkRuntimeContext mRuntimeContext;
   private final TupleTag<OutputT> mMainOutputTag;
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
 
-  MultiDoFnFunction(
+  /**
+   * @param accum          The Spark Accumulator that handles the Beam Aggregators.
+   * @param fn             DoFunction to be wrapped.
+   * @param runtimeContext Runtime to apply function in.
+   * @param mainOutputTag  The main output {@link TupleTag}.
+   * @param sideInputs     Side inputs used in DoFunction.
+   */
+  public MultiDoFnFunction(
+      Accumulator<NamedAggregators> accum,
       OldDoFn<InputT, OutputT> fn,
       SparkRuntimeContext runtimeContext,
       TupleTag<OutputT> mainOutputTag,
       Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
+    this.accum = accum;
     this.mFunction = fn;
     this.mRuntimeContext = runtimeContext;
     this.mMainOutputTag = mainOutputTag;
     this.mSideInputs = sideInputs;
   }
 
+  /**
+   * @param fn             DoFunction to be wrapped.
+   * @param runtimeContext Runtime to apply function in.
+   * @param mainOutputTag  The main output {@link TupleTag}.
+   * @param sideInputs     Side inputs used in DoFunction.
+   */
+  public MultiDoFnFunction(
+      OldDoFn<InputT, OutputT> fn,
+      SparkRuntimeContext runtimeContext,
+      TupleTag<OutputT> mainOutputTag,
+      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
+    this(null, fn, runtimeContext, mainOutputTag, sideInputs);
+  }
+
+
   @Override
   public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>>
       call(Iterator<WindowedValue<InputT>> iter) throws Exception {
@@ -99,6 +128,15 @@ class MultiDoFnFunction<InputT, OutputT>
     }
 
     @Override
+    public Accumulator<NamedAggregators> getAccumulator() {
+      if (accum == null) {
+        throw new UnsupportedOperationException("SparkRunner does not provide Aggregator support "
+             + "for MultiDoFnFunction of type: " + mFunction.getClass().getCanonicalName());
+      }
+      return accum;
+    }
+
+    @Override
     protected void clearOutput() {
       outputs.clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 0e7db9f..8127ddc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -18,14 +18,18 @@
 
 package org.apache.beam.runners.spark.translation;
 
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.serializer.KryoSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The Spark context factory.
  */
 public final class SparkContextFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to
@@ -40,19 +44,20 @@ public final class SparkContextFactory {
   private SparkContextFactory() {
   }
 
-  public static synchronized JavaSparkContext getSparkContext(String master, String appName) {
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+  public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
+    // reuse should be ignored if the context is provided.
+    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !options.getUsesProvidedSparkContext()) {
       if (sparkContext == null) {
-        sparkContext = createSparkContext(master, appName);
-        sparkMaster = master;
-      } else if (!master.equals(sparkMaster)) {
+        sparkContext = createSparkContext(options);
+        sparkMaster = options.getSparkMaster();
+      } else if (!options.getSparkMaster().equals(sparkMaster)) {
         throw new IllegalArgumentException(String.format("Cannot reuse spark context "
-                + "with different spark master URL. Existing: %s, requested: %s.",
-            sparkMaster, master));
+            + "with different spark master URL. Existing: %s, requested: %s.",
+                sparkMaster, options.getSparkMaster()));
       }
       return sparkContext;
     } else {
-      return createSparkContext(master, appName);
+      return createSparkContext(options);
     }
   }
 
@@ -62,14 +67,25 @@ public final class SparkContextFactory {
     }
   }
 
-  private static JavaSparkContext createSparkContext(String master, String appName) {
-    SparkConf conf = new SparkConf();
-    if (!conf.contains("spark.master")) {
-      // set master if not set.
-      conf.setMaster(master);
+  private static JavaSparkContext createSparkContext(SparkPipelineOptions options) {
+    if (options.getUsesProvidedSparkContext()) {
+      LOG.info("Using a provided Spark Context");
+      JavaSparkContext jsc = options.getProvidedSparkContext();
+      if (jsc == null || jsc.sc().isStopped()){
+        LOG.error("The provided Spark context " + jsc + " was not created or was stopped");
+        throw new RuntimeException("The provided Spark context was not created or was stopped");
+      }
+      return jsc;
+    } else {
+      LOG.info("Creating a brand new Spark Context.");
+      SparkConf conf = new SparkConf();
+      if (!conf.contains("spark.master")) {
+        // set master if not set.
+        conf.setMaster(options.getSparkMaster());
+      }
+      conf.setAppName(options.getAppName());
+      conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
+      return new JavaSparkContext(conf);
     }
-    conf.setAppName(appName);
-    conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
-    return new JavaSparkContext(conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
deleted file mode 100644
index 02e8b3d..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-/**
- * Pipeline {@link SparkRunner.Evaluator} for Spark.
- */
-public final class SparkPipelineEvaluator extends SparkRunner.Evaluator {
-
-  private final EvaluationContext ctxt;
-
-  public SparkPipelineEvaluator(EvaluationContext ctxt, SparkPipelineTranslator translator) {
-    super(translator);
-    this.ctxt = ctxt;
-  }
-
-  @Override
-  protected <TransformT extends PTransform<? super PInput, POutput>>
-  void doVisitTransform(TransformTreeNode
-      node) {
-    @SuppressWarnings("unchecked")
-    TransformT transform = (TransformT) node.getTransform();
-    @SuppressWarnings("unchecked")
-    Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
-    @SuppressWarnings("unchecked") TransformEvaluator<TransformT> evaluator =
-        (TransformEvaluator<TransformT>) translator.translate(transformClass);
-    LOG.info("Evaluating {}", transform);
-    AppliedPTransform<PInput, POutput, TransformT> appliedTransform =
-        AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform);
-    ctxt.setCurrentTransform(appliedTransform);
-    evaluator.evaluate(transform, ctxt);
-    ctxt.setCurrentTransform(null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
index 1f7ccf1..f77df5f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
@@ -27,5 +27,8 @@ public interface SparkPipelineTranslator {
   boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz);
 
   <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
-  translate(Class<TransformT> clazz);
+  translateBounded(Class<TransformT> clazz);
+
+  <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+  translateUnbounded(Class<TransformT> clazz);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 566a272..fbaf5b8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -39,6 +40,7 @@ import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.Accumulator;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -109,11 +111,13 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
   @Override
   public <AggregatprInputT, AggregatorOutputT>
   Aggregator<AggregatprInputT, AggregatorOutputT> createAggregatorInternal(
-      String named,
-      Combine.CombineFn<AggregatprInputT, ?, AggregatorOutputT> combineFn) {
-    return mRuntimeContext.createAggregator(named, combineFn);
+          String named,
+          Combine.CombineFn<AggregatprInputT, ?, AggregatorOutputT> combineFn) {
+    return mRuntimeContext.createAggregator(getAccumulator(), named, combineFn);
   }
 
+  public abstract Accumulator<NamedAggregators> getAccumulator();
+
   @Override
   public InputT element() {
     return windowedValue.getValue();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 4e4cd1a..94c1648 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -20,17 +20,14 @@ package org.apache.beam.runners.spark.translation;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import com.google.common.collect.ImmutableList;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.aggregators.AggAccumParam;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource;
 import org.apache.beam.sdk.AggregatorValues;
@@ -56,11 +53,6 @@ import org.apache.spark.metrics.MetricsSystem;
  * data flow program is launched.
  */
 public class SparkRuntimeContext implements Serializable {
-  /**
-   * An accumulator that is a map from names to aggregators.
-   */
-  private final Accumulator<NamedAggregators> accum;
-
   private final String serializedPipelineOptions;
 
   /**
@@ -69,10 +61,9 @@ public class SparkRuntimeContext implements Serializable {
   private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>();
   private transient CoderRegistry coderRegistry;
 
-  SparkRuntimeContext(JavaSparkContext jsc, Pipeline pipeline) {
-    final SparkPipelineOptions opts = pipeline.getOptions().as(SparkPipelineOptions.class);
-    accum = registerMetrics(jsc, opts);
-    serializedPipelineOptions = serializePipelineOptions(opts);
+  SparkRuntimeContext(Pipeline pipeline, JavaSparkContext jsc) {
+    this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions());
+    registerMetrics(pipeline.getOptions().as(SparkPipelineOptions.class), jsc);
   }
 
   private static String serializePipelineOptions(PipelineOptions pipelineOptions) {
@@ -91,10 +82,9 @@ public class SparkRuntimeContext implements Serializable {
     }
   }
 
-  private Accumulator<NamedAggregators> registerMetrics(final JavaSparkContext jsc,
-                                                        final SparkPipelineOptions opts) {
-    final NamedAggregators initialValue = new NamedAggregators();
-    final Accumulator<NamedAggregators> accum = jsc.accumulator(initialValue, new AggAccumParam());
+  private void registerMetrics(final SparkPipelineOptions opts, final JavaSparkContext jsc) {
+    final Accumulator<NamedAggregators> accum = AccumulatorSingleton.getInstance(jsc);
+    final NamedAggregators initialValue = accum.value();
 
     if (opts.getEnableSparkSinks()) {
       final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
@@ -104,26 +94,28 @@ public class SparkRuntimeContext implements Serializable {
       metricsSystem.removeSource(aggregatorMetricSource);
       metricsSystem.registerSource(aggregatorMetricSource);
     }
-
-    return accum;
   }
 
   /**
    * Retrieves corresponding value of an aggregator.
    *
+   * @param accum          The Spark Accumulator holding all Aggregators.
    * @param aggregatorName Name of the aggregator to retrieve the value of.
    * @param typeClass      Type class of value to be retrieved.
    * @param <T>            Type of object to be returned.
    * @return The value of the aggregator.
    */
-  public <T> T getAggregatorValue(String aggregatorName, Class<T> typeClass) {
+  public <T> T getAggregatorValue(Accumulator<NamedAggregators> accum,
+                                  String aggregatorName,
+                                  Class<T> typeClass) {
     return accum.value().getValue(aggregatorName, typeClass);
   }
 
-  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) {
+  public <T> AggregatorValues<T> getAggregatorValues(Accumulator<NamedAggregators> accum,
+                                                     Aggregator<?, T> aggregator) {
     @SuppressWarnings("unchecked")
     Class<T> aggValueClass = (Class<T>) aggregator.getCombineFn().getOutputType().getRawType();
-    final T aggregatorValue = getAggregatorValue(aggregator.getName(), aggValueClass);
+    final T aggregatorValue = getAggregatorValue(accum, aggregator.getName(), aggValueClass);
     return new AggregatorValues<T>() {
       @Override
       public Collection<T> getValues() {
@@ -144,14 +136,16 @@ public class SparkRuntimeContext implements Serializable {
   /**
    * Creates and aggregator and associates it with the specified name.
    *
+   * @param accum     Spark Accumulator.
    * @param named     Name of aggregator.
    * @param combineFn Combine function used in aggregation.
-   * @param <InputT>      Type of inputs to aggregator.
-   * @param <InterT>   Intermediate data type
-   * @param <OutputT>     Type of aggregator outputs.
+   * @param <InputT>  Type of inputs to aggregator.
+   * @param <InterT>  Intermediate data type
+   * @param <OutputT> Type of aggregator outputs.
    * @return Specified aggregator
    */
   public synchronized <InputT, InterT, OutputT> Aggregator<InputT, OutputT> createAggregator(
+      Accumulator<NamedAggregators> accum,
       String named,
       Combine.CombineFn<? super InputT, InterT, OutputT> combineFn) {
     @SuppressWarnings("unchecked")


[2/4] incubator-beam git commit: [BEAM-610] Enable spark's checkpointing mechanism for driver-failure recovery in streaming.

Posted by am...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 8341c6d..1a0511f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -19,39 +19,32 @@
 
 package org.apache.beam.runners.spark.translation;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
 import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.TextIO;
@@ -63,36 +56,30 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+
 import scala.Tuple2;
 
+
 /**
  * Supports translation between a Beam transform, and Spark's operations on RDDs.
  */
@@ -101,31 +88,6 @@ public final class TransformTranslator {
   private TransformTranslator() {
   }
 
-  /**
-   * Getter of the field.
-   */
-  public static class FieldGetter {
-    private final Map<String, Field> fields;
-
-    public FieldGetter(Class<?> clazz) {
-      this.fields = Maps.newHashMap();
-      for (Field f : clazz.getDeclaredFields()) {
-        f.setAccessible(true);
-        this.fields.put(f.getName(), f);
-      }
-    }
-
-    public <T> T get(String fieldname, Object value) {
-      try {
-        @SuppressWarnings("unchecked")
-        T fieldValue = (T) fields.get(fieldname).get(value);
-        return fieldValue;
-      } catch (IllegalAccessException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-  }
-
   private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
     return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
       @SuppressWarnings("unchecked")
@@ -142,28 +104,18 @@ public final class TransformTranslator {
     };
   }
 
-  private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbk() {
+  private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbko() {
     return new TransformEvaluator<GroupByKeyOnly<K, V>>() {
       @Override
       public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context.getInputRDD(transform);
+        JavaRDD<WindowedValue<KV<K, V>>> inRDD =
+            (JavaRDD<WindowedValue<KV<K, V>>>) context.getInputRDD(transform);
+
         @SuppressWarnings("unchecked")
-        KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
-        Coder<K> keyCoder = coder.getKeyCoder();
-        Coder<V> valueCoder = coder.getValueCoder();
-
-        // Use coders to convert objects in the PCollection to byte arrays, so they
-        // can be transferred over the network for the shuffle.
-        JavaRDDLike<WindowedValue<KV<K, Iterable<V>>>, ?> outRDD = fromPair(
-              toPair(inRDD.map(WindowingHelpers.<KV<K, V>>unwindowFunction()))
-            .mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder))
-            .groupByKey()
-            .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder)))
-            // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK
-            .map(WindowingHelpers.<KV<K, Iterable<V>>>windowFunction());
-        context.setOutputRDD(transform, outRDD);
+        final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
+
+        context.setOutputRDD(transform, GroupCombineFunctions.groupByKeyOnly(inRDD, coder));
       }
     };
   }
@@ -174,81 +126,52 @@ public final class TransformTranslator {
       @Override
       public void evaluate(GroupAlsoByWindow<K, V> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>, ?>)
+        JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> inRDD =
+            (JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>)
                 context.getInputRDD(transform);
 
-        Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder =
-            context.getInput(transform).getCoder();
-        Coder<K> keyCoder = transform.getKeyCoder(inputCoder);
-        Coder<V> valueCoder = transform.getValueCoder(inputCoder);
-
         @SuppressWarnings("unchecked")
-        KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+        final KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
             (KvCoder<K, Iterable<WindowedValue<V>>>) context.getInput(transform).getCoder();
-        Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
-
-        IterableCoder<WindowedValue<V>> inputIterableValueCoder =
-            (IterableCoder<WindowedValue<V>>) inputValueCoder;
-        Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
-        WindowedValueCoder<V> inputIterableWindowedValueCoder =
-            (WindowedValueCoder<V>) inputIterableElementCoder;
 
-        Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+        final Accumulator<NamedAggregators> accum =
+            AccumulatorSingleton.getInstance(context.getSparkContext());
 
-        @SuppressWarnings("unchecked")
-        WindowingStrategy<?, W> windowingStrategy =
-            (WindowingStrategy<?, W>) transform.getWindowingStrategy();
-
-        OldDoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
-            new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
-                windowingStrategy,
-                new InMemoryStateInternalsFactory<K>(),
-                SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
-
-        // GroupAlsoByWindow current uses a dummy in-memory StateInternals
-        JavaRDDLike<WindowedValue<KV<K, Iterable<V>>>, ?> outRDD =
-            inRDD.mapPartitions(
-                new DoFnFunction<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>>(
-                    gabwDoFn, context.getRuntimeContext(), null));
-
-        context.setOutputRDD(transform, outRDD);
+        context.setOutputRDD(transform, GroupCombineFunctions.groupAlsoByWindow(inRDD, transform,
+            context.getRuntimeContext(), accum, inputKvCoder));
       }
     };
   }
 
-  private static final FieldGetter GROUPED_FG = new FieldGetter(Combine.GroupedValues.class);
-
   private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
   grouped() {
     return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() {
       @Override
       public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform,
                            EvaluationContext context) {
-        Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed = GROUPED_FG.get("fn", transform);
         @SuppressWarnings("unchecked")
         JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?>) context.getInputRDD(transform);
-        context.setOutputRDD(transform,
-            inRDD.map(new KVFunction<>(keyed)));
+            (JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?>)
+                context.getInputRDD(transform);
+        context.setOutputRDD(transform, inRDD.map(
+            new TranslationUtils.CombineGroupedValues<>(transform)));
       }
     };
   }
 
-  private static final FieldGetter COMBINE_GLOBALLY_FG = new FieldGetter(Combine.Globally.class);
-
   private static <InputT, AccumT, OutputT> TransformEvaluator<Combine.Globally<InputT, OutputT>>
   combineGlobally() {
     return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() {
 
       @Override
       public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) {
-        final Combine.CombineFn<InputT, AccumT, OutputT> globally =
-            COMBINE_GLOBALLY_FG.get("fn", transform);
+        @SuppressWarnings("unchecked")
+        JavaRDD<WindowedValue<InputT>> inRdd =
+            (JavaRDD<WindowedValue<InputT>>) context.getInputRDD(transform);
 
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<InputT>, ?> inRdd =
-            (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
+        final Combine.CombineFn<InputT, AccumT, OutputT> globally =
+            (Combine.CombineFn<InputT, AccumT, OutputT>) transform.getFn();
 
         final Coder<InputT> iCoder = context.getInput(transform).getCoder();
         final Coder<AccumT> aCoder;
@@ -259,61 +182,26 @@ public final class TransformTranslator {
           throw new IllegalStateException("Could not determine coder for accumulator", e);
         }
 
-        // Use coders to convert objects in the PCollection to byte arrays, so they
-        // can be transferred over the network for the shuffle.
-        JavaRDD<byte[]> inRddBytes = inRdd
-            .map(WindowingHelpers.<InputT>unwindowFunction())
-            .map(CoderHelpers.toByteFunction(iCoder));
-
-        /*AccumT*/ byte[] acc = inRddBytes.aggregate(
-            CoderHelpers.toByteArray(globally.createAccumulator(), aCoder),
-            new Function2</*AccumT*/ byte[], /*InputT*/ byte[], /*AccumT*/ byte[]>() {
-              @Override
-              public /*AccumT*/ byte[] call(/*AccumT*/ byte[] ab, /*InputT*/ byte[] ib)
-                  throws Exception {
-                AccumT a = CoderHelpers.fromByteArray(ab, aCoder);
-                InputT i = CoderHelpers.fromByteArray(ib, iCoder);
-                return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder);
-              }
-            },
-            new Function2</*AccumT*/ byte[], /*AccumT*/ byte[], /*AccumT*/ byte[]>() {
-              @Override
-              public /*AccumT*/ byte[] call(/*AccumT*/ byte[] a1b, /*AccumT*/ byte[] a2b)
-                  throws Exception {
-                AccumT a1 = CoderHelpers.fromByteArray(a1b, aCoder);
-                AccumT a2 = CoderHelpers.fromByteArray(a2b, aCoder);
-                // don't use Guava's ImmutableList.of as values may be null
-                List<AccumT> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
-                AccumT merged = globally.mergeAccumulators(accumulators);
-                return CoderHelpers.toByteArray(merged, aCoder);
-              }
-            }
-        );
-        OutputT output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
-
-        Coder<OutputT> coder = context.getOutput(transform).getCoder();
+        final Coder<OutputT> oCoder = context.getOutput(transform).getCoder();
         JavaRDD<byte[]> outRdd = context.getSparkContext().parallelize(
             // don't use Guava's ImmutableList.of as output may be null
-            CoderHelpers.toByteArrays(Collections.singleton(output), coder));
-        context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(coder))
+            CoderHelpers.toByteArrays(Collections.singleton(
+                GroupCombineFunctions.combineGlobally(inRdd, globally, iCoder, aCoder)), oCoder));
+        context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(oCoder))
             .map(WindowingHelpers.<OutputT>windowFunction()));
       }
     };
   }
 
-  private static final FieldGetter COMBINE_PERKEY_FG = new FieldGetter(Combine.PerKey.class);
-
   private static <K, InputT, AccumT, OutputT>
   TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
     return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() {
       @Override
-      public void evaluate(Combine.PerKey<K, InputT, OutputT>
-                               transform, EvaluationContext context) {
-        final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> keyed =
-            COMBINE_PERKEY_FG.get("fn", transform);
+      public void evaluate(Combine.PerKey<K, InputT, OutputT> transform,
+                           EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<KV<K, InputT>>, ?> inRdd =
-            (JavaRDDLike<WindowedValue<KV<K, InputT>>, ?>) context.getInputRDD(transform);
+        final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> keyed =
+            (Combine.KeyedCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
 
         @SuppressWarnings("unchecked")
         KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>)
@@ -329,214 +217,66 @@ public final class TransformTranslator {
         }
         Coder<KV<K, InputT>> kviCoder = KvCoder.of(keyCoder, viCoder);
         Coder<KV<K, AccumT>> kvaCoder = KvCoder.of(keyCoder, vaCoder);
-
-        // We need to duplicate K as both the key of the JavaPairRDD as well as inside the value,
-        // since the functions passed to combineByKey don't receive the associated key of each
-        // value, and we need to map back into methods in Combine.KeyedCombineFn, which each
-        // require the key in addition to the InputT's and AccumT's being merged/accumulated.
-        // Once Spark provides a way to include keys in the arguments of combine/merge functions,
-        // we won't need to duplicate the keys anymore.
-
-        // Key has to bw windowed in order to group by window as well
-        JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
-            inRdd.flatMapToPair(
-                new PairFlatMapFunction<WindowedValue<KV<K, InputT>>, WindowedValue<K>,
-                    WindowedValue<KV<K, InputT>>>() {
-                  @Override
-                  public Iterable<Tuple2<WindowedValue<K>,
-                      WindowedValue<KV<K, InputT>>>>
-                  call(WindowedValue<KV<K, InputT>> kv) {
-                      List<Tuple2<WindowedValue<K>,
-                          WindowedValue<KV<K, InputT>>>> tuple2s =
-                          Lists.newArrayListWithCapacity(kv.getWindows().size());
-                      for (BoundedWindow boundedWindow: kv.getWindows()) {
-                        WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
-                            boundedWindow.maxTimestamp(), boundedWindow, kv.getPane());
-                        tuple2s.add(new Tuple2<>(wk, kv));
-                      }
-                    return tuple2s;
-                  }
-                });
         //-- windowed coders
         final WindowedValue.FullWindowedValueCoder<K> wkCoder =
-                WindowedValue.FullWindowedValueCoder.of(keyCoder,
+            WindowedValue.FullWindowedValueCoder.of(keyCoder,
                 context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
         final WindowedValue.FullWindowedValueCoder<KV<K, InputT>> wkviCoder =
-                WindowedValue.FullWindowedValueCoder.of(kviCoder,
+            WindowedValue.FullWindowedValueCoder.of(kviCoder,
                 context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
         final WindowedValue.FullWindowedValueCoder<KV<K, AccumT>> wkvaCoder =
-                WindowedValue.FullWindowedValueCoder.of(kvaCoder,
+            WindowedValue.FullWindowedValueCoder.of(kvaCoder,
                 context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
 
-        // Use coders to convert objects in the PCollection to byte arrays, so they
-        // can be transferred over the network for the shuffle.
-        JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair
-            .mapToPair(CoderHelpers.toByteFunction(wkCoder, wkviCoder));
-
-        // The output of combineByKey will be "AccumT" (accumulator)
-        // types rather than "OutputT" (final output types) since Combine.CombineFn
-        // only provides ways to merge VAs, and no way to merge VOs.
-        JavaPairRDD</*K*/ ByteArray, /*KV<K, AccumT>*/ byte[]> accumulatedBytes =
-            inRddDuplicatedKeyPairBytes.combineByKey(
-            new Function</*KV<K, InputT>*/ byte[], /*KV<K, AccumT>*/ byte[]>() {
-              @Override
-              public /*KV<K, AccumT>*/ byte[] call(/*KV<K, InputT>*/ byte[] input) {
-                WindowedValue<KV<K, InputT>> wkvi =
-                    CoderHelpers.fromByteArray(input, wkviCoder);
-                AccumT va = keyed.createAccumulator(wkvi.getValue().getKey());
-                va = keyed.addInput(wkvi.getValue().getKey(), va, wkvi.getValue().getValue());
-                WindowedValue<KV<K, AccumT>> wkva =
-                    WindowedValue.of(KV.of(wkvi.getValue().getKey(), va), wkvi.getTimestamp(),
-                    wkvi.getWindows(), wkvi.getPane());
-                return CoderHelpers.toByteArray(wkva, wkvaCoder);
-              }
-            },
-            new Function2</*KV<K, AccumT>*/ byte[],
-                /*KV<K, InputT>*/ byte[],
-                /*KV<K, AccumT>*/ byte[]>() {
-              @Override
-              public /*KV<K, AccumT>*/ byte[] call(/*KV<K, AccumT>*/ byte[] acc,
-                  /*KV<K, InputT>*/ byte[] input) {
-                WindowedValue<KV<K, AccumT>> wkva =
-                    CoderHelpers.fromByteArray(acc, wkvaCoder);
-                WindowedValue<KV<K, InputT>> wkvi =
-                    CoderHelpers.fromByteArray(input, wkviCoder);
-                AccumT va =
-                    keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(),
-                    wkvi.getValue().getValue());
-                wkva = WindowedValue.of(KV.of(wkva.getValue().getKey(), va), wkva.getTimestamp(),
-                    wkva.getWindows(), wkva.getPane());
-                return CoderHelpers.toByteArray(wkva, wkvaCoder);
-              }
-            },
-            new Function2</*KV<K, AccumT>*/ byte[],
-                /*KV<K, AccumT>*/ byte[],
-                /*KV<K, AccumT>*/ byte[]>() {
-              @Override
-              public /*KV<K, AccumT>*/ byte[] call(/*KV<K, AccumT>*/ byte[] acc1,
-                  /*KV<K, AccumT>*/ byte[] acc2) {
-                WindowedValue<KV<K, AccumT>> wkva1 =
-                    CoderHelpers.fromByteArray(acc1, wkvaCoder);
-                WindowedValue<KV<K, AccumT>> wkva2 =
-                    CoderHelpers.fromByteArray(acc2, wkvaCoder);
-                AccumT va = keyed.mergeAccumulators(wkva1.getValue().getKey(),
-                    // don't use Guava's ImmutableList.of as values may be null
-                    Collections.unmodifiableList(Arrays.asList(wkva1.getValue().getValue(),
-                    wkva2.getValue().getValue())));
-                WindowedValue<KV<K, AccumT>> wkva =
-                    WindowedValue.of(KV.of(wkva1.getValue().getKey(),
-                    va), wkva1.getTimestamp(), wkva1.getWindows(), wkva1.getPane());
-                return CoderHelpers.toByteArray(wkva, wkvaCoder);
-              }
-            });
-
-        JavaPairRDD<WindowedValue<K>, WindowedValue<OutputT>> extracted = accumulatedBytes
-            .mapToPair(CoderHelpers.fromByteFunction(wkCoder, wkvaCoder))
-            .mapValues(
-                new Function<WindowedValue<KV<K, AccumT>>, WindowedValue<OutputT>>() {
-                  @Override
-                  public WindowedValue<OutputT> call(WindowedValue<KV<K, AccumT>> acc) {
-                    return WindowedValue.of(keyed.extractOutput(acc.getValue().getKey(),
-                        acc.getValue().getValue()), acc.getTimestamp(),
-                        acc.getWindows(), acc.getPane());
-                  }
-                });
+        @SuppressWarnings("unchecked")
+        JavaRDD<WindowedValue<KV<K, InputT>>> inRdd =
+                (JavaRDD<WindowedValue<KV<K, InputT>>>) context.getInputRDD(transform);
 
-        context.setOutputRDD(transform,
-            fromPair(extracted)
-            .map(new Function<KV<WindowedValue<K>, WindowedValue<OutputT>>,
-                WindowedValue<KV<K, OutputT>>>() {
-              @Override
-              public WindowedValue<KV<K, OutputT>> call(KV<WindowedValue<K>,
-                  WindowedValue<OutputT>> kwvo)
-                  throws Exception {
-                WindowedValue<OutputT> wvo = kwvo.getValue();
-                KV<K, OutputT> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue());
-                return WindowedValue.of(kvo, wvo.getTimestamp(), wvo.getWindows(), wvo.getPane());
-              }
-            }));
+        context.setOutputRDD(transform, GroupCombineFunctions.combinePerKey(inRdd, keyed, wkCoder,
+            wkviCoder, wkvaCoder));
       }
     };
   }
 
-  private static final class KVFunction<K, InputT, OutputT>
-      implements Function<WindowedValue<KV<K, Iterable<InputT>>>,
-      WindowedValue<KV<K, OutputT>>> {
-    private final Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed;
-
-     KVFunction(Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed) {
-      this.keyed = keyed;
-    }
-
-    @Override
-    public WindowedValue<KV<K, OutputT>> call(WindowedValue<KV<K,
-        Iterable<InputT>>> windowedKv)
-        throws Exception {
-      KV<K, Iterable<InputT>> kv = windowedKv.getValue();
-      return WindowedValue.of(KV.of(kv.getKey(), keyed.apply(kv.getKey(), kv.getValue())),
-          windowedKv.getTimestamp(), windowedKv.getWindows(), windowedKv.getPane());
-    }
-  }
-
-  private static <K, V> JavaPairRDD<K, V> toPair(JavaRDDLike<KV<K, V>, ?> rdd) {
-    return rdd.mapToPair(new PairFunction<KV<K, V>, K, V>() {
-      @Override
-      public Tuple2<K, V> call(KV<K, V> kv) {
-        return new Tuple2<>(kv.getKey(), kv.getValue());
-      }
-    });
-  }
-
-  private static <K, V> JavaRDDLike<KV<K, V>, ?> fromPair(JavaPairRDD<K, V> rdd) {
-    return rdd.map(new Function<Tuple2<K, V>, KV<K, V>>() {
-      @Override
-      public KV<K, V> call(Tuple2<K, V> t2) {
-        return KV.of(t2._1(), t2._2());
-      }
-    });
-  }
-
   private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
     return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
-        DoFnFunction<InputT, OutputT> dofn =
-            new DoFnFunction<>(transform.getFn(),
-                context.getRuntimeContext(),
-                getSideInputs(transform.getSideInputs(), context));
         @SuppressWarnings("unchecked")
         JavaRDDLike<WindowedValue<InputT>, ?> inRDD =
             (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
-        context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
+        Accumulator<NamedAggregators> accum =
+            AccumulatorSingleton.getInstance(context.getSparkContext());
+        Map<TupleTag<?>, BroadcastHelper<?>> sideInputs =
+            TranslationUtils.getSideInputs(transform.getSideInputs(), context);
+        context.setOutputRDD(transform,
+            inRDD.mapPartitions(new DoFnFunction<>(accum, transform.getFn(),
+                context.getRuntimeContext(), sideInputs)));
       }
     };
   }
 
-  private static final FieldGetter MULTIDO_FG = new FieldGetter(ParDo.BoundMulti.class);
-
-  private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>> multiDo() {
+  private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>
+  multiDo() {
     return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
-        TupleTag<OutputT> mainOutputTag = MULTIDO_FG.get("mainOutputTag", transform);
-        MultiDoFnFunction<InputT, OutputT> multifn = new MultiDoFnFunction<>(
-            transform.getFn(),
-            context.getRuntimeContext(),
-            mainOutputTag,
-            getSideInputs(transform.getSideInputs(), context));
-
         @SuppressWarnings("unchecked")
         JavaRDDLike<WindowedValue<InputT>, ?> inRDD =
             (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
+        Accumulator<NamedAggregators> accum =
+            AccumulatorSingleton.getInstance(context.getSparkContext());
         JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD
-            .mapPartitionsToPair(multifn)
-            .cache();
-
+            .mapPartitionsToPair(
+                new MultiDoFnFunction<>(accum, transform.getFn(), context.getRuntimeContext(),
+                transform.getMainOutputTag(), TranslationUtils.getSideInputs(
+                    transform.getSideInputs(), context)))
+                        .cache();
         PCollectionTuple pct = context.getOutput(transform);
         for (Map.Entry<TupleTag<?>, PCollection<?>> e : pct.getAll().entrySet()) {
           @SuppressWarnings("unchecked")
           JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered =
-              all.filter(new TupleTagFilter(e.getKey()));
+              all.filter(new TranslationUtils.TupleTagFilter(e.getKey()));
           @SuppressWarnings("unchecked")
           // Object is the best we can do since different outputs can have different tags
           JavaRDD<WindowedValue<Object>> values =
@@ -753,22 +493,17 @@ public final class TransformTranslator {
         JavaRDDLike<WindowedValue<T>, ?> inRDD =
             (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform);
 
-        @SuppressWarnings("unchecked")
-        WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn();
-
-        // Avoid running assign windows if both source and destination are global window
-        // or if the user has not specified the WindowFn (meaning they are just messing
-        // with triggering or allowed lateness)
-        if (windowFn == null
-            || (context.getInput(transform).getWindowingStrategy().getWindowFn()
-                    instanceof GlobalWindows
-                && windowFn instanceof GlobalWindows)) {
+        if (TranslationUtils.skipAssignWindows(transform, context)) {
           context.setOutputRDD(transform, inRDD);
         } else {
+          @SuppressWarnings("unchecked")
+          WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn();
           OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
-          DoFnFunction<T, T> dofn =
-              new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);
-          context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
+          Accumulator<NamedAggregators> accum =
+              AccumulatorSingleton.getInstance(context.getSparkContext());
+          context.setOutputRDD(transform,
+              inRDD.mapPartitions(new DoFnFunction<>(accum, addWindowsDoFn,
+                  context.getRuntimeContext(), null)));
         }
       }
     };
@@ -822,42 +557,6 @@ public final class TransformTranslator {
     };
   }
 
-  private static final class TupleTagFilter<V>
-      implements Function<Tuple2<TupleTag<V>, WindowedValue<?>>, Boolean> {
-
-    private final TupleTag<V> tag;
-
-    private TupleTagFilter(TupleTag<V> tag) {
-      this.tag = tag;
-    }
-
-    @Override
-    public Boolean call(Tuple2<TupleTag<V>, WindowedValue<?>> input) {
-      return tag.equals(input._1());
-    }
-  }
-
-  private static Map<TupleTag<?>, BroadcastHelper<?>> getSideInputs(
-      List<PCollectionView<?>> views,
-      EvaluationContext context) {
-    if (views == null) {
-      return ImmutableMap.of();
-    } else {
-      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs = Maps.newHashMap();
-      for (PCollectionView<?> view : views) {
-        Iterable<? extends WindowedValue<?>> collectionView = context.getPCollectionView(view);
-        Coder<Iterable<WindowedValue<?>>> coderInternal = view.getCoderInternal();
-        @SuppressWarnings("unchecked")
-        BroadcastHelper<?> helper =
-            BroadcastHelper.create((Iterable<WindowedValue<?>>) collectionView, coderInternal);
-        //broadcast side inputs
-        helper.broadcast(context.getSparkContext());
-        sideInputs.put(view.getTagInternal(), helper);
-      }
-      return sideInputs;
-    }
-  }
-
   private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
       .newHashMap();
 
@@ -870,7 +569,7 @@ public final class TransformTranslator {
     EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
     EVALUATORS.put(ParDo.Bound.class, parDo());
     EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
-    EVALUATORS.put(GroupByKeyOnly.class, gbk());
+    EVALUATORS.put(GroupByKeyOnly.class, gbko());
     EVALUATORS.put(GroupAlsoByWindow.class, gabw());
     EVALUATORS.put(Combine.GroupedValues.class, grouped());
     EVALUATORS.put(Combine.Globally.class, combineGlobally());
@@ -883,17 +582,6 @@ public final class TransformTranslator {
     EVALUATORS.put(Window.Bound.class, window());
   }
 
-  public static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
-  getTransformEvaluator(Class<TransformT> clazz) {
-    @SuppressWarnings("unchecked")
-    TransformEvaluator<TransformT> transform =
-        (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
-    if (transform == null) {
-      throw new IllegalStateException("No TransformEvaluator registered for " + clazz);
-    }
-    return transform;
-  }
-
   /**
    * Translator matches Beam transformation with the appropriate evaluator.
    */
@@ -905,17 +593,20 @@ public final class TransformTranslator {
     }
 
     @Override
-    public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translate(
-        Class<TransformT> clazz) {
-      return getTransformEvaluator(clazz);
+    public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+        translateBounded (Class<TransformT> clazz) {
+      @SuppressWarnings("unchecked") TransformEvaluator<TransformT> transformEvaluator =
+          (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
+      checkState(transformEvaluator != null,
+          "No TransformEvaluator registered for BOUNDED transform %s", clazz);
+      return transformEvaluator;
     }
-  }
 
-  private static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>,
-      Serializable {
     @Override
-    public StateInternals<K> stateInternalsForKey(K key) {
-      return InMemoryStateInternals.forKey(key);
+    public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+        translateUnbounded(Class<TransformT> clazz) {
+      throw new IllegalStateException("TransformTranslator used in a batch pipeline only "
+          + "supports BOUNDED transforms.");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
new file mode 100644
index 0000000..9b156fe
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+
+import scala.Tuple2;
+
+/**
+ * A set of utilities to help translating Beam transformations into Spark transformations.
+ */
+public final class TranslationUtils {
+
+  private TranslationUtils() {
+  }
+
+  /**
+   * In-memory state internals factory.
+   *
+   * @param <K> State key type.
+   */
+  static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>,
+      Serializable {
+    @Override
+    public StateInternals<K> stateInternalsForKey(K key) {
+      return InMemoryStateInternals.forKey(key);
+    }
+  }
+
+  /**
+   * A {@link Combine.GroupedValues} function applied to grouped KVs.
+   *
+   * @param <K>       Grouped key type.
+   * @param <InputT>  Grouped values type.
+   * @param <OutputT> Output type.
+   */
+  public static class CombineGroupedValues<K, InputT, OutputT> implements
+      Function<WindowedValue<KV<K, Iterable<InputT>>>, WindowedValue<KV<K, OutputT>>> {
+    private final Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed;
+
+    public CombineGroupedValues(Combine.GroupedValues<K, InputT, OutputT> transform) {
+      //noinspection unchecked
+      keyed = (Combine.KeyedCombineFn<K, InputT, ?, OutputT>) transform.getFn();
+    }
+
+    @Override
+    public WindowedValue<KV<K, OutputT>> call(WindowedValue<KV<K, Iterable<InputT>>> windowedKv)
+        throws Exception {
+      KV<K, Iterable<InputT>> kv = windowedKv.getValue();
+      return WindowedValue.of(KV.of(kv.getKey(), keyed.apply(kv.getKey(), kv.getValue())),
+          windowedKv.getTimestamp(), windowedKv.getWindows(), windowedKv.getPane());
+    }
+  }
+
+  /**
+   * Checks if the window transformation should be applied or skipped.
+   *
+   * <p>
+   * Avoid running assign windows if both source and destination are global window
+   * or if the user has not specified the WindowFn (meaning they are just messing
+   * with triggering or allowed lateness).
+   * </p>
+   *
+   * @param transform The {@link Window.Bound} transformation.
+   * @param context   The {@link EvaluationContext}.
+   * @param <T>       PCollection type.
+   * @param <W>       {@link BoundedWindow} type.
+   * @return if to apply the transformation.
+   */
+  public static <T, W extends BoundedWindow> boolean
+  skipAssignWindows(Window.Bound<T> transform, EvaluationContext context) {
+    @SuppressWarnings("unchecked")
+    WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn();
+    return windowFn == null
+        || (context.getInput(transform).getWindowingStrategy().getWindowFn()
+            instanceof GlobalWindows
+                && windowFn instanceof GlobalWindows);
+  }
+
+  /** Transform a pair stream into a value stream. */
+  public static <T1, T2> JavaDStream<T2> dStreamValues(JavaPairDStream<T1, T2> pairDStream) {
+    return pairDStream.map(new Function<Tuple2<T1, T2>, T2>() {
+      @Override
+      public T2 call(Tuple2<T1, T2> v1) throws Exception {
+        return v1._2();
+      }
+    });
+  }
+
+  /** {@link KV} to pair function. */
+  static <K, V> PairFunction<KV<K, V>, K, V> toPairFunction() {
+    return new PairFunction<KV<K, V>, K, V>() {
+      @Override
+      public Tuple2<K, V> call(KV<K, V> kv) {
+        return new Tuple2<>(kv.getKey(), kv.getValue());
+      }
+    };
+  }
+
+  /**  A pair to {@link KV} function . */
+  static <K, V> Function<Tuple2<K, V>, KV<K, V>> fromPairFunction() {
+    return new Function<Tuple2<K, V>, KV<K, V>>() {
+      @Override
+      public KV<K, V> call(Tuple2<K, V> t2) {
+        return KV.of(t2._1(), t2._2());
+      }
+    };
+  }
+
+  /**
+   * A utility class to filter {@link TupleTag}s.
+   *
+   * @param <V> TupleTag type.
+   */
+  public static final class TupleTagFilter<V>
+      implements Function<Tuple2<TupleTag<V>, WindowedValue<?>>, Boolean> {
+
+    private final TupleTag<V> tag;
+
+    public TupleTagFilter(TupleTag<V> tag) {
+      this.tag = tag;
+    }
+
+    @Override
+    public Boolean call(Tuple2<TupleTag<V>, WindowedValue<?>> input) {
+      return tag.equals(input._1());
+    }
+  }
+
+  /***
+   * Create SideInputs as Broadcast variables.
+   *
+   * @param views   The {@link PCollectionView}s.
+   * @param context The {@link EvaluationContext}.
+   * @return a map of tagged {@link BroadcastHelper}s.
+   */
+  public static Map<TupleTag<?>, BroadcastHelper<?>> getSideInputs(List<PCollectionView<?>> views,
+      EvaluationContext context) {
+    if (views == null) {
+      return ImmutableMap.of();
+    } else {
+      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs = Maps.newHashMap();
+      for (PCollectionView<?> view : views) {
+        Iterable<? extends WindowedValue<?>> collectionView = context.getPCollectionView(view);
+        Coder<Iterable<WindowedValue<?>>> coderInternal = view.getCoderInternal();
+        @SuppressWarnings("unchecked")
+        BroadcastHelper<?> helper =
+            BroadcastHelper.create((Iterable<WindowedValue<?>>) collectionView, coderInternal);
+        //broadcast side inputs
+        helper.broadcast(context.getSparkContext());
+        sideInputs.put(view.getTagInternal(), helper);
+      }
+      return sideInputs;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
new file mode 100644
index 0000000..b7a407c
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation.streaming;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.translation.TransformTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link JavaStreamingContext} factory for resilience.
+ * @see <a href="https://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#how-to-configure-checkpointing">how-to-configure-checkpointing</a>
+ */
+public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
+  private static final Iterable<String> KNOWN_RELIABLE_FS = Arrays.asList("hdfs", "s3", "gs");
+
+  private final Pipeline pipeline;
+  private final SparkPipelineOptions options;
+
+  public SparkRunnerStreamingContextFactory(Pipeline pipeline, SparkPipelineOptions options) {
+    this.pipeline = pipeline;
+    this.options = options;
+  }
+
+  private StreamingEvaluationContext ctxt;
+
+  @Override
+  public JavaStreamingContext create() {
+    LOG.info("Creating a new Spark Streaming Context");
+
+    SparkPipelineTranslator translator = new StreamingTransformTranslator.Translator(
+        new TransformTranslator.Translator());
+    Duration batchDuration = new Duration(options.getBatchIntervalMillis());
+    LOG.info("Setting Spark streaming batchDuration to {} msec", batchDuration.milliseconds());
+
+    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
+    JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
+    ctxt = new StreamingEvaluationContext(jsc, pipeline, jssc,
+        options.getTimeout());
+    pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
+    ctxt.computeOutputs();
+
+    // set checkpoint dir.
+    String checkpointDir = options.getCheckpointDir();
+    LOG.info("Checkpoint dir set to: {}", checkpointDir);
+    try {
+      // validate checkpoint dir and warn if not of a known durable filesystem.
+      URL checkpointDirUrl = new URL(checkpointDir);
+      if (!Iterables.any(KNOWN_RELIABLE_FS, Predicates.equalTo(checkpointDirUrl.getProtocol()))) {
+        LOG.warn("Checkpoint dir URL {} does not match a reliable filesystem, in case of failures "
+            + "this job may not recover properly or even at all.", checkpointDirUrl);
+      }
+    } catch (MalformedURLException e) {
+      throw new RuntimeException("Failed to form checkpoint dir URL. CheckpointDir should be in "
+          + "the form of hdfs:///path/to/dir or other reliable fs protocol, "
+              + "or file:///path/to/dir for local mode.", e);
+    }
+    jssc.checkpoint(checkpointDir);
+
+    return jssc;
+  }
+
+  public StreamingEvaluationContext getCtxt() {
+    return ctxt;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
index 2e4da44..5a43c55 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
@@ -18,14 +18,18 @@
 package org.apache.beam.runners.spark.translation.streaming;
 
 
+import com.google.common.collect.Iterables;
+
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.runners.spark.translation.WindowingHelpers;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -82,11 +86,17 @@ public class StreamingEvaluationContext extends EvaluationContext {
     @SuppressWarnings("unchecked")
     JavaDStream<WindowedValue<T>> getDStream() {
       if (dStream == null) {
-        // create the DStream from values
+        WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
+            WindowedValue.getValueOnlyCoder(coder);
+        // create the DStream from queue
         Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
         for (Iterable<T> v : values) {
-          setOutputRDDFromValues(currentTransform.getTransform(), v, coder);
-          rddQueue.offer((JavaRDD<WindowedValue<T>>) getOutputRDD(currentTransform.getTransform()));
+          Iterable<WindowedValue<T>> windowedValues =
+              Iterables.transform(v, WindowingHelpers.<T>windowValueFunction());
+          JavaRDD<WindowedValue<T>> rdd = getSparkContext().parallelize(
+              CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(
+                  CoderHelpers.fromByteFunction(windowCoder));
+          rddQueue.offer(rdd);
         }
         // create dstream from queue, one at a time, no defaults
         // mainly for unit test so no reason to have this configurable
@@ -102,7 +112,10 @@ public class StreamingEvaluationContext extends EvaluationContext {
   }
 
   <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) {
-    PValue pvalue = (PValue) getOutput(transform);
+    setStream((PValue) getOutput(transform), dStream);
+  }
+
+  <T> void setStream(PValue pvalue, JavaDStream<WindowedValue<T>> dStream) {
     DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream);
     pstreams.put(pvalue, dStreamHolder);
     leafStreams.add(dStreamHolder);
@@ -110,6 +123,10 @@ public class StreamingEvaluationContext extends EvaluationContext {
 
   boolean hasStream(PTransform<?, ?> transform) {
     PValue pvalue = (PValue) getInput(transform);
+    return hasStream(pvalue);
+  }
+
+  boolean hasStream(PValue pvalue) {
     return pstreams.containsKey(pvalue);
   }
 
@@ -141,19 +158,23 @@ public class StreamingEvaluationContext extends EvaluationContext {
 
   @Override
   public void computeOutputs() {
+    super.computeOutputs(); // in case the pipeline contains bounded branches as well.
     for (DStreamHolder<?> streamHolder : leafStreams) {
       computeOutput(streamHolder);
-    }
+    } // force a DStream action
   }
 
   private static <T> void computeOutput(DStreamHolder<T> streamHolder) {
-    streamHolder.getDStream().foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
+    JavaDStream<WindowedValue<T>> dStream = streamHolder.getDStream();
+    // cache in DStream level not RDD
+    // because there could be a difference in StorageLevel if the DStream is windowed.
+    dStream.dstream().cache();
+    dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
       @Override
       public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
-        rdd.rdd().cache();
         rdd.count();
       }
-    }); // force a DStream action
+    });
   }
 
   @Override
@@ -163,8 +184,9 @@ public class StreamingEvaluationContext extends EvaluationContext {
     } else {
       jssc.awaitTermination();
     }
-    //TODO: stop gracefully ?
-    jssc.stop(false, false);
+    // stop streaming context gracefully, so checkpointing (and other computations) get to
+    // finish before shutdown.
+    jssc.stop(false, true);
     state = State.DONE;
     super.close();
   }
@@ -197,7 +219,7 @@ public class StreamingEvaluationContext extends EvaluationContext {
   }
 
   @Override
-  protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
     super.setCurrentTransform(transform);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index c55be3d..64ddc57 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -17,53 +17,68 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-import com.google.common.collect.Lists;
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.reflect.TypeToken;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import kafka.serializer.Decoder;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.io.KafkaIO;
-import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
 import org.apache.beam.runners.spark.translation.DoFnFunction;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
+import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
+import org.apache.beam.runners.spark.translation.MultiDoFnFunction;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
+import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.runners.spark.translation.WindowingHelpers;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaDStreamLike;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaPairInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.kafka.KafkaUtils;
+
 import scala.Tuple2;
 
 
@@ -114,19 +129,6 @@ public final class StreamingTransformTranslator {
     };
   }
 
-  private static <T> TransformEvaluator<Create.Values<T>> create() {
-    return new TransformEvaluator<Create.Values<T>>() {
-      @SuppressWarnings("unchecked")
-      @Override
-      public void evaluate(Create.Values<T> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        Iterable<T> elems = transform.getElements();
-        Coder<T> coder = sec.getOutput(transform).getCoder();
-        sec.setDStreamFromQueue(transform, Collections.singletonList(elems), coder);
-      }
-    };
-  }
-
   private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
     return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
       @Override
@@ -146,173 +148,325 @@ public final class StreamingTransformTranslator {
       public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         PCollectionList<T> pcs = sec.getInput(transform);
-        JavaDStream<WindowedValue<T>> first =
-            (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0));
-        List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1);
-        for (int i = 1; i < pcs.size(); i++) {
-          rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i)));
+        // since this is a streaming pipeline, at least one of the PCollections to "flatten" are
+        // unbounded, meaning it represents a DStream.
+        // So we could end up with an unbounded unified DStream.
+        final List<JavaRDD<WindowedValue<T>>> rdds = new ArrayList<>();
+        final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
+        for (PCollection<T> pcol: pcs.getAll()) {
+          if (sec.hasStream(pcol)) {
+            dStreams.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcol));
+          } else {
+            rdds.add((JavaRDD<WindowedValue<T>>) context.getRDD(pcol));
+          }
+        }
+        // start by unifying streams into a single stream.
+        JavaDStream<WindowedValue<T>> unifiedStreams =
+            sec.getStreamingContext().union(dStreams.remove(0), dStreams);
+        // now unify in RDDs.
+        if (rdds.size() > 0) {
+          JavaDStream<WindowedValue<T>> joined = unifiedStreams.transform(
+              new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() {
+            @Override
+            public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> streamRdd)
+                throws Exception {
+              return new JavaSparkContext(streamRdd.context()).union(streamRdd, rdds);
+            }
+          });
+          sec.setStream(transform, joined);
+        } else {
+          sec.setStream(transform, unifiedStreams);
         }
-        JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest);
-        sec.setStream(transform, dstream);
       }
     };
   }
 
-  private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> rddTransform(
-      final SparkPipelineTranslator rddTranslator) {
-    return new TransformEvaluator<TransformT>() {
-      @SuppressWarnings("unchecked")
+  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
+    return new TransformEvaluator<Window.Bound<T>>() {
       @Override
-      public void evaluate(TransformT transform, EvaluationContext context) {
-        TransformEvaluator<TransformT> rddEvaluator =
-            rddTranslator.translate((Class<TransformT>) transform.getClass());
-
+      public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        if (sec.hasStream(transform)) {
-          JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
-              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
-              sec.getStream(transform);
-
-          sec.setStream(transform, dStream
-              .transform(new RDDTransform<>(sec, rddEvaluator, transform)));
+        @SuppressWarnings("unchecked")
+        WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn();
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<T>> dStream =
+            (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
+        if (windowFn instanceof FixedWindows) {
+          Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
+              .getMillis());
+          sec.setStream(transform, dStream.window(windowDuration));
+        } else if (windowFn instanceof SlidingWindows) {
+          Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
+              .getMillis());
+          Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
+              .getMillis());
+          sec.setStream(transform, dStream.window(windowDuration, slideDuration));
+        }
+        //--- then we apply windowing to the elements
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<T>> dStream2 =
+            (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
+        if (TranslationUtils.skipAssignWindows(transform, context)) {
+          sec.setStream(transform, dStream2);
         } else {
-          // if the transformation requires direct access to RDD (not in stream)
-          // this is used for "fake" transformations like with PAssert
-          rddEvaluator.evaluate(transform, context);
+          final OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
+          final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+          JavaDStream<WindowedValue<T>> outStream = dStream2.transform(
+              new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() {
+            @Override
+            public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
+              final Accumulator<NamedAggregators> accum =
+                AccumulatorSingleton.getInstance(new JavaSparkContext(rdd.context()));
+              return rdd.mapPartitions(
+                new DoFnFunction<>(accum, addWindowsDoFn, runtimeContext, null));
+            }
+          });
+          sec.setStream(transform, outStream);
         }
       }
     };
   }
 
-  /**
-   * RDD transform function If the transformation function doesn't have an input, create a fake one
-   * as an empty RDD.
-   *
-   * @param <TransformT> PTransform type
-   */
-  private static final class RDDTransform<TransformT extends PTransform<?, ?>>
-      implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
-
-    private final StreamingEvaluationContext context;
-    private final AppliedPTransform<?, ?, ?> appliedPTransform;
-    private final TransformEvaluator<TransformT> rddEvaluator;
-    private final TransformT transform;
-
-
-    private RDDTransform(StreamingEvaluationContext context,
-                         TransformEvaluator<TransformT> rddEvaluator,
-        TransformT transform) {
-      this.context = context;
-      this.appliedPTransform = context.getCurrentTransform();
-      this.rddEvaluator = rddEvaluator;
-      this.transform = transform;
-    }
+  private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbko() {
+    return new TransformEvaluator<GroupByKeyOnly<K, V>>() {
+      @Override
+      public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
 
-    @Override
-    @SuppressWarnings("unchecked")
-    public JavaRDD<WindowedValue<Object>>
-        call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
-      AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
-      context.setCurrentTransform(appliedPTransform);
-      context.setInputRDD(transform, rdd);
-      rddEvaluator.evaluate(transform, context);
-      if (!context.hasOutputRDD(transform)) {
-        // fake RDD as output
-        context.setOutputRDD(transform,
-            context.getSparkContext().<WindowedValue<Object>>emptyRDD());
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<KV<K, V>>> dStream =
+            (JavaDStream<WindowedValue<KV<K, V>>>) sec.getStream(transform);
+
+        @SuppressWarnings("unchecked")
+        final KvCoder<K, V> coder = (KvCoder<K, V>) sec.getInput(transform).getCoder();
+
+        JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+            dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>,
+                JavaRDD<WindowedValue<KV<K, Iterable<V>>>>>() {
+          @Override
+          public JavaRDD<WindowedValue<KV<K, Iterable<V>>>> call(
+              JavaRDD<WindowedValue<KV<K, V>>> rdd) throws Exception {
+            return GroupCombineFunctions.groupByKeyOnly(rdd, coder);
+          }
+        });
+        sec.setStream(transform, outStream);
       }
-      JavaRDD<WindowedValue<Object>> outRDD =
-          (JavaRDD<WindowedValue<Object>>) context.getOutputRDD(transform);
-      context.setCurrentTransform(existingAPT);
-      return outRDD;
-    }
+    };
   }
 
-  @SuppressWarnings("unchecked")
-  private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> foreachRDD(
-      final SparkPipelineTranslator rddTranslator) {
-    return new TransformEvaluator<TransformT>() {
+  private static <K, V, W extends BoundedWindow>
+      TransformEvaluator<GroupAlsoByWindow<K, V>> gabw() {
+    return new TransformEvaluator<GroupAlsoByWindow<K, V>>() {
       @Override
-      public void evaluate(TransformT transform, EvaluationContext context) {
-        TransformEvaluator<TransformT> rddEvaluator =
-            rddTranslator.translate((Class<TransformT>) transform.getClass());
+      public void evaluate(final GroupAlsoByWindow<K, V> transform, EvaluationContext context) {
+        final StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> dStream =
+            (JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>)
+                sec.getStream(transform);
 
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        if (sec.hasStream(transform)) {
-          JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
-              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
-              sec.getStream(transform);
+        @SuppressWarnings("unchecked")
+        final KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+            (KvCoder<K, Iterable<WindowedValue<V>>>) sec.getInput(transform).getCoder();
+
+        JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+            dStream.transform(new Function<JavaRDD<WindowedValue<KV<K,
+                Iterable<WindowedValue<V>>>>>, JavaRDD<WindowedValue<KV<K, Iterable<V>>>>>() {
+              @Override
+              public JavaRDD<WindowedValue<KV<K, Iterable<V>>>> call(JavaRDD<WindowedValue<KV<K,
+                  Iterable<WindowedValue<V>>>>> rdd) throws Exception {
+                final Accumulator<NamedAggregators> accum =
+                    AccumulatorSingleton.getInstance(new JavaSparkContext(rdd.context()));
+                return GroupCombineFunctions.groupAlsoByWindow(rdd, transform, runtimeContext,
+                    accum, inputKvCoder);
+              }
+            });
+        sec.setStream(transform, outStream);
+      }
+    };
+  }
 
-          dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform));
-        } else {
-          rddEvaluator.evaluate(transform, context);
-        }
+  private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
+  grouped() {
+    return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() {
+      @Override
+      public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform,
+                           EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream =
+            (JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>>) sec.getStream(transform);
+        sec.setStream(transform, dStream.map(
+            new TranslationUtils.CombineGroupedValues<>(transform)));
       }
     };
   }
 
-  /**
-   * RDD output function.
-   *
-   * @param <TransformT> PTransform type
-   */
-  private static final class RDDOutputOperator<TransformT extends PTransform<?, ?>>
-      implements VoidFunction<JavaRDD<WindowedValue<Object>>> {
+  private static <InputT, AccumT, OutputT> TransformEvaluator<Combine.Globally<InputT, OutputT>>
+  combineGlobally() {
+    return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() {
 
-    private final StreamingEvaluationContext context;
-    private final AppliedPTransform<?, ?, ?> appliedPTransform;
-    private final TransformEvaluator<TransformT> rddEvaluator;
-    private final TransformT transform;
+      @Override
+      public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        @SuppressWarnings("unchecked")
+        final Combine.CombineFn<InputT, AccumT, OutputT> globally =
+            (Combine.CombineFn<InputT, AccumT, OutputT>) transform.getFn();
 
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<InputT>> dStream =
+            (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform);
+
+        final Coder<InputT> iCoder = sec.getInput(transform).getCoder();
+        final Coder<OutputT> oCoder = sec.getOutput(transform).getCoder();
+        final Coder<AccumT> aCoder;
+        try {
+          aCoder = globally.getAccumulatorCoder(sec.getPipeline().getCoderRegistry(), iCoder);
+        } catch (CannotProvideCoderException e) {
+          throw new IllegalStateException("Could not determine coder for accumulator", e);
+        }
 
-    private RDDOutputOperator(StreamingEvaluationContext context,
-                              TransformEvaluator<TransformT> rddEvaluator, TransformT transform) {
-      this.context = context;
-      this.appliedPTransform = context.getCurrentTransform();
-      this.rddEvaluator = rddEvaluator;
-      this.transform = transform;
-    }
+        JavaDStream<WindowedValue<OutputT>> outStream = dStream.transform(
+            new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>() {
+          @Override
+          public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd)
+              throws Exception {
+            JavaRDD<byte[]> outRdd = new JavaSparkContext(rdd.context()).parallelize(
+            // don't use Guava's ImmutableList.of as output may be null
+            CoderHelpers.toByteArrays(Collections.singleton(
+                GroupCombineFunctions.combineGlobally(rdd, globally, iCoder, aCoder)), oCoder));
+            return outRdd.map(CoderHelpers.fromByteFunction(oCoder)).map(
+                WindowingHelpers.<OutputT>windowFunction());
+          }
+        });
 
-    @Override
-    @SuppressWarnings("unchecked")
-    public void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
-      AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
-      context.setCurrentTransform(appliedPTransform);
-      context.setInputRDD(transform, rdd);
-      rddEvaluator.evaluate(transform, context);
-      context.setCurrentTransform(existingAPT);
-    }
+        sec.setStream(transform, outStream);
+      }
+    };
   }
 
-  private static <T> TransformEvaluator<Window.Bound<T>> window() {
-    return new TransformEvaluator<Window.Bound<T>>() {
+  private static <K, InputT, AccumT, OutputT>
+  TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
+    return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() {
       @Override
-      public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
+      public void evaluate(Combine.PerKey<K, InputT, OutputT>
+                               transform, EvaluationContext context) {
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        WindowFn<? super T, ?> windowFn = transform.getWindowFn();
         @SuppressWarnings("unchecked")
-        JavaDStream<WindowedValue<T>> dStream =
-            (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
-        if (windowFn instanceof FixedWindows) {
-          Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
-              .getMillis());
-          sec.setStream(transform, dStream.window(windowDuration));
-        } else if (windowFn instanceof SlidingWindows) {
-          Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
-              .getMillis());
-          Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
-              .getMillis());
-          sec.setStream(transform, dStream.window(windowDuration, slideDuration));
+        final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> keyed =
+            (Combine.KeyedCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<KV<K, InputT>>> dStream =
+            (JavaDStream<WindowedValue<KV<K, InputT>>>) sec.getStream(transform);
+
+        @SuppressWarnings("unchecked")
+        KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) sec.getInput(transform).getCoder();
+        Coder<K> keyCoder = inputCoder.getKeyCoder();
+        Coder<InputT> viCoder = inputCoder.getValueCoder();
+        Coder<AccumT> vaCoder;
+        try {
+          vaCoder = keyed.getAccumulatorCoder(
+              context.getPipeline().getCoderRegistry(), keyCoder, viCoder);
+        } catch (CannotProvideCoderException e) {
+          throw new IllegalStateException("Could not determine coder for accumulator", e);
         }
-        //--- then we apply windowing to the elements
-        OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
-        DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
-            ((StreamingEvaluationContext) context).getRuntimeContext(), null);
+        Coder<KV<K, InputT>> kviCoder = KvCoder.of(keyCoder, viCoder);
+        Coder<KV<K, AccumT>> kvaCoder = KvCoder.of(keyCoder, vaCoder);
+        //-- windowed coders
+        final WindowedValue.FullWindowedValueCoder<K> wkCoder =
+            WindowedValue.FullWindowedValueCoder.of(keyCoder,
+                sec.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
+        final WindowedValue.FullWindowedValueCoder<KV<K, InputT>> wkviCoder =
+            WindowedValue.FullWindowedValueCoder.of(kviCoder,
+                sec.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
+        final WindowedValue.FullWindowedValueCoder<KV<K, AccumT>> wkvaCoder =
+            WindowedValue.FullWindowedValueCoder.of(kvaCoder,
+                sec.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
+
+        JavaDStream<WindowedValue<KV<K, OutputT>>> outStream =
+            dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, InputT>>>,
+                JavaRDD<WindowedValue<KV<K, OutputT>>>>() {
+          @Override
+          public JavaRDD<WindowedValue<KV<K, OutputT>>> call(
+              JavaRDD<WindowedValue<KV<K, InputT>>> rdd) throws Exception {
+            return GroupCombineFunctions.combinePerKey(rdd, keyed, wkCoder, wkviCoder, wkvaCoder);
+          }
+        });
+
+        sec.setStream(transform, outStream);
+      }
+    };
+  }
+
+  private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
+    return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
+      @Override
+      public void evaluate(final ParDo.Bound<InputT, OutputT> transform,
+                           final EvaluationContext context) {
+        final StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+        final Map<TupleTag<?>, BroadcastHelper<?>> sideInputs =
+            TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         @SuppressWarnings("unchecked")
-        JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
-            (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
-            sec.getStream(transform);
-        sec.setStream(transform, dstream.mapPartitions(dofn));
+        JavaDStream<WindowedValue<InputT>> dStream =
+            (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform);
+
+        JavaDStream<WindowedValue<OutputT>> outStream =
+            dStream.transform(new Function<JavaRDD<WindowedValue<InputT>>,
+                JavaRDD<WindowedValue<OutputT>>>() {
+          @Override
+          public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd) throws
+              Exception {
+            final Accumulator<NamedAggregators> accum =
+                AccumulatorSingleton.getInstance(new JavaSparkContext(rdd.context()));
+            return rdd.mapPartitions(
+                new DoFnFunction<>(accum, transform.getFn(), runtimeContext, sideInputs));
+          }
+        });
+
+        sec.setStream(transform, outStream);
+      }
+    };
+  }
+
+  private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>
+  multiDo() {
+    return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
+      @Override
+      public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform,
+                           final EvaluationContext context) {
+        final StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+        final Map<TupleTag<?>, BroadcastHelper<?>> sideInputs =
+            TranslationUtils.getSideInputs(transform.getSideInputs(), context);
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<InputT>> dStream =
+            (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform);
+        JavaPairDStream<TupleTag<?>, WindowedValue<?>> all = dStream.transformToPair(
+            new Function<JavaRDD<WindowedValue<InputT>>,
+                JavaPairRDD<TupleTag<?>, WindowedValue<?>>>() {
+          @Override
+          public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call(
+              JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
+            final Accumulator<NamedAggregators> accum =
+                AccumulatorSingleton.getInstance(new JavaSparkContext(rdd.context()));
+            return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(accum, transform.getFn(),
+                runtimeContext, transform.getMainOutputTag(), sideInputs));
+          }
+        }).cache();
+        PCollectionTuple pct = sec.getOutput(transform);
+        for (Map.Entry<TupleTag<?>, PCollection<?>> e : pct.getAll().entrySet()) {
+          @SuppressWarnings("unchecked")
+          JavaPairDStream<TupleTag<?>, WindowedValue<?>> filtered =
+              all.filter(new TranslationUtils.TupleTagFilter(e.getKey()));
+          @SuppressWarnings("unchecked")
+          // Object is the best we can do since different outputs can have different tags
+          JavaDStream<WindowedValue<Object>> values =
+              (JavaDStream<WindowedValue<Object>>)
+                  (JavaDStream<?>) TranslationUtils.dStreamValues(filtered);
+          sec.setStream(e.getValue(), values);
+        }
       }
     };
   }
@@ -321,79 +475,54 @@ public final class StreamingTransformTranslator {
       .newHashMap();
 
   static {
+    EVALUATORS.put(GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly.class, gbko());
+    EVALUATORS.put(GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow.class, gabw());
+    EVALUATORS.put(Combine.GroupedValues.class, grouped());
+    EVALUATORS.put(Combine.Globally.class, combineGlobally());
+    EVALUATORS.put(Combine.PerKey.class, combinePerKey());
+    EVALUATORS.put(ParDo.Bound.class, parDo());
+    EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
     EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
     EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
-    EVALUATORS.put(Create.Values.class, create());
     EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
     EVALUATORS.put(Window.Bound.class, window());
     EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
   }
 
-  private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS = Sets
-      .newHashSet();
-
-  static {
-    //TODO - add support for the following
-    UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class);
-    UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class);
-    UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class);
-    UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class);
-    UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
-    UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
-      getTransformEvaluator(Class<TransformT> clazz, SparkPipelineTranslator rddTranslator) {
-    TransformEvaluator<TransformT> transform =
-        (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
-    if (transform == null) {
-      if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
-        throw new UnsupportedOperationException("Beam transformation " + clazz
-          .getCanonicalName()
-          + " is currently unsupported by the Spark streaming pipeline");
-      }
-      // DStream transformations will transform an RDD into another RDD
-      // Actions will create output
-      // In Beam it depends on the PTransform's Input and Output class
-      Class<?> pTOutputClazz = getPTransformOutputClazz(clazz);
-      if (PDone.class.equals(pTOutputClazz)) {
-        return foreachRDD(rddTranslator);
-      } else {
-        return rddTransform(rddTranslator);
-      }
-    }
-    return transform;
-  }
-
-  private static <TransformT extends PTransform<?, ?>> Class<?>
-  getPTransformOutputClazz(Class<TransformT> clazz) {
-    Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments();
-    return TypeToken.of(clazz).resolveType(types[1]).getRawType();
-  }
-
   /**
-   * Translator matches Beam transformation with the appropriate Spark streaming evaluator.
-   * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation
+   * Translator matches Beam transformation with the appropriate evaluator.
    */
   public static class Translator implements SparkPipelineTranslator {
 
-    private final SparkPipelineTranslator rddTranslator;
+    private final SparkPipelineTranslator batchTranslator;
 
-    public Translator(SparkPipelineTranslator rddTranslator) {
-      this.rddTranslator = rddTranslator;
+    Translator(SparkPipelineTranslator batchTranslator) {
+      this.batchTranslator = batchTranslator;
     }
 
     @Override
     public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
-      // streaming includes rdd transformations as well
-      return EVALUATORS.containsKey(clazz) || rddTranslator.hasTranslation(clazz);
+      // streaming includes rdd/bounded transformations as well
+      return EVALUATORS.containsKey(clazz) || batchTranslator.hasTranslation(clazz);
+    }
+
+    @Override
+    public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+        translateBounded(Class<TransformT> clazz) {
+      TransformEvaluator<TransformT> transformEvaluator = batchTranslator.translateBounded(clazz);
+      checkState(transformEvaluator != null,
+          "No TransformEvaluator registered for BOUNDED transform %s", clazz);
+      return transformEvaluator;
     }
 
     @Override
     public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
-    translate(Class<TransformT> clazz) {
-      return getTransformEvaluator(clazz, rddTranslator);
+        translateUnbounded(Class<TransformT> clazz) {
+      @SuppressWarnings("unchecked") TransformEvaluator<TransformT> transformEvaluator =
+          (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
+      checkState(transformEvaluator != null,
+          "No TransformEvaluator registered for for UNBOUNDED transform %s", clazz);
+      return transformEvaluator;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0feb6499/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
index 5c13b80..0e742eb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
@@ -52,8 +52,12 @@ public abstract class BroadcastHelper<T> implements Serializable {
 
   public abstract T getValue();
 
+  public abstract boolean isBroadcasted();
+
   public abstract void broadcast(JavaSparkContext jsc);
 
+  public abstract void unregister();
+
   /**
    * A {@link BroadcastHelper} that relies on the underlying
    * Spark serialization (Kryo) to broadcast values. This is appropriate when
@@ -77,9 +81,20 @@ public abstract class BroadcastHelper<T> implements Serializable {
     }
 
     @Override
+    public boolean isBroadcasted() {
+      return bcast != null;
+    }
+
+    @Override
     public void broadcast(JavaSparkContext jsc) {
       this.bcast = jsc.broadcast(value);
     }
+
+    @Override
+    public void unregister() {
+      this.bcast.destroy();
+      this.bcast = null;
+    }
   }
 
   /**
@@ -107,10 +122,21 @@ public abstract class BroadcastHelper<T> implements Serializable {
     }
 
     @Override
+    public boolean isBroadcasted() {
+      return bcast != null;
+    }
+
+    @Override
     public void broadcast(JavaSparkContext jsc) {
       this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder));
     }
 
+    @Override
+    public void unregister() {
+      this.bcast.destroy();
+      this.bcast = null;
+    }
+
     private T deserialize() {
       T val;
       try {