You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/15 19:48:03 UTC

[06/23] incubator-beam git commit: [BEAM-11] Spark runner directory structure and pom setup.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
new file mode 100644
index 0000000..fd6f5da
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io.hadoop;
+
+import java.io.IOException;
+
+import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ShardNameTemplateHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class);
+
+  public static final String OUTPUT_FILE_PREFIX = "spark.dataflow.fileoutputformat.prefix";
+  public static final String OUTPUT_FILE_TEMPLATE = "spark.dataflow.fileoutputformat.template";
+  public static final String OUTPUT_FILE_SUFFIX = "spark.dataflow.fileoutputformat.suffix";
+
+  private ShardNameTemplateHelper() {
+  }
+
+  public static <K, V> Path getDefaultWorkFile(FileOutputFormat<K, V> format,
+      TaskAttemptContext context) throws IOException {
+    FileOutputCommitter committer =
+        (FileOutputCommitter) format.getOutputCommitter(context);
+    return new Path(committer.getWorkPath(), getOutputFile(context));
+  }
+
+  private static String getOutputFile(TaskAttemptContext context) {
+    TaskID taskId = context.getTaskAttemptID().getTaskID();
+    int partition = taskId.getId();
+
+    String filePrefix = context.getConfiguration().get(OUTPUT_FILE_PREFIX);
+    String fileTemplate = context.getConfiguration().get(OUTPUT_FILE_TEMPLATE);
+    String fileSuffix = context.getConfiguration().get(OUTPUT_FILE_SUFFIX);
+    return filePrefix + replaceShardNumber(fileTemplate, partition) + fileSuffix;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
new file mode 100644
index 0000000..4feaff6
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io.hadoop;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T>
+    implements ShardNameTemplateAware {
+
+  @Override
+  public void checkOutputSpecs(JobContext job) {
+    // don't fail if the output already exists
+  }
+
+  @Override
+  protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
+    Path path = ShardNameTemplateHelper.getDefaultWorkFile(this, context);
+    return path.getFileSystem(context.getConfiguration()).create(path);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
new file mode 100644
index 0000000..922b906
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+public class TemplatedSequenceFileOutputFormat<K, V> extends SequenceFileOutputFormat<K, V>
+    implements ShardNameTemplateAware {
+
+  @Override
+  public void checkOutputSpecs(JobContext job) {
+    // don't fail if the output already exists
+  }
+
+  @Override
+  public Path getDefaultWorkFile(TaskAttemptContext context,
+      String extension) throws IOException {
+    // note that the passed-in extension is ignored since it comes from the template
+    return ShardNameTemplateHelper.getDefaultWorkFile(this, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
new file mode 100644
index 0000000..1e53dce
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V>
+    implements ShardNameTemplateAware {
+
+  @Override
+  public void checkOutputSpecs(JobContext job) {
+    // don't fail if the output already exists
+  }
+
+  @Override
+  public Path getDefaultWorkFile(TaskAttemptContext context,
+      String extension) throws IOException {
+    // note that the passed-in extension is ignored since it comes from the template
+    return ShardNameTemplateHelper.getDefaultWorkFile(this, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java
new file mode 100644
index 0000000..17edba3
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+
+/**
+ * Options used to configure Spark streaming.
+ */
+public interface SparkStreamingPipelineOptions extends SparkPipelineOptions {
+  @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " +
+          "execution is stopped")
+  @Default.Long(-1)
+  Long getTimeout();
+
+  void setTimeout(Long batchInterval);
+
+  @Override
+  @Default.Boolean(true)
+  boolean isStreaming();
+
+  @Override
+  @Default.String("spark streaming dataflow pipeline job")
+  String getAppName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java
new file mode 100644
index 0000000..822feb4
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+
+public final class SparkStreamingPipelineOptionsFactory {
+
+  private SparkStreamingPipelineOptionsFactory() {
+  }
+
+  public static SparkStreamingPipelineOptions create() {
+    return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..2c5414d
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
+import com.google.common.collect.ImmutableList;
+
+public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.<Class<? extends PipelineOptions>>of(SparkStreamingPipelineOptions
+            .class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java
new file mode 100644
index 0000000..9d1d786
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.cloud.dataflow.sdk.values.PValue;
+
+import org.apache.beam.runners.spark.EvaluationContext;
+import org.apache.beam.runners.spark.SparkRuntimeContext;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+
+/**
+ * Streaming evaluation context helps to handle streaming.
+ */
+public class StreamingEvaluationContext extends EvaluationContext {
+
+  private final JavaStreamingContext jssc;
+  private final long timeout;
+  private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>();
+  private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>();
+
+  public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
+      JavaStreamingContext jssc, long timeout) {
+    super(jsc, pipeline);
+    this.jssc = jssc;
+    this.timeout = timeout;
+  }
+
+  /**
+   * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for
+   * testing.
+   */
+  private class DStreamHolder<T> {
+
+    private Iterable<Iterable<T>> values;
+    private Coder<T> coder;
+    private JavaDStream<WindowedValue<T>> dStream;
+
+    DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
+      this.values = values;
+      this.coder = coder;
+    }
+
+    DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
+      this.dStream = dStream;
+    }
+
+    @SuppressWarnings("unchecked")
+    JavaDStream<WindowedValue<T>> getDStream() {
+      if (dStream == null) {
+        // create the DStream from values
+        Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
+        for (Iterable<T> v : values) {
+          setOutputRDDFromValues(currentTransform.getTransform(), v, coder);
+          rddQueue.offer((JavaRDD<WindowedValue<T>>) getOutputRDD(currentTransform.getTransform()));
+        }
+        // create dstream from queue, one at a time, no defaults
+        // mainly for unit test so no reason to have this configurable
+        dStream = jssc.queueStream(rddQueue, true);
+      }
+      return dStream;
+    }
+  }
+
+  <T> void setDStreamFromQueue(
+      PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
+    pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder));
+  }
+
+  <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) {
+    PValue pvalue = (PValue) getOutput(transform);
+    DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream);
+    pstreams.put(pvalue, dStreamHolder);
+    leafStreams.add(dStreamHolder);
+  }
+
+  boolean hasStream(PTransform<?, ?> transform) {
+    PValue pvalue = (PValue) getInput(transform);
+    return pstreams.containsKey(pvalue);
+  }
+
+  JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
+    return getStream((PValue) getInput(transform));
+  }
+
+  JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
+    DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
+    JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
+    leafStreams.remove(dStreamHolder);
+    return dStream;
+  }
+
+  // used to set the RDD from the DStream in the RDDHolder for transformation
+  <T> void setInputRDD(
+      PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) {
+    setRDD((PValue) getInput(transform), rdd);
+  }
+
+  // used to get the RDD transformation output and use it as the DStream transformation output
+  JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
+    return getRDD((PValue) getOutput(transform));
+  }
+
+  public JavaStreamingContext getStreamingContext() {
+    return jssc;
+  }
+
+  @Override
+  protected void computeOutputs() {
+    for (DStreamHolder<?> streamHolder : leafStreams) {
+      computeOutput(streamHolder);
+    }
+  }
+
+  private static <T> void computeOutput(DStreamHolder<T> streamHolder) {
+    streamHolder.getDStream().foreachRDD(new Function<JavaRDD<WindowedValue<T>>, Void>() {
+      @Override
+      public Void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
+        rdd.rdd().cache();
+        rdd.count();
+        return null;
+      }
+    }); // force a DStream action
+  }
+
+  @Override
+  public void close() {
+    if (timeout > 0) {
+      jssc.awaitTerminationOrTimeout(timeout);
+    } else {
+      jssc.awaitTermination();
+    }
+    //TODO: stop gracefully ?
+    jssc.stop(false, false);
+    state = State.DONE;
+    super.close();
+  }
+
+  private State state = State.RUNNING;
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  //---------------- override in order to expose in package
+  @Override
+  protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
+    return super.getInput(transform);
+  }
+  @Override
+  protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
+    return super.getOutput(transform);
+  }
+
+  @Override
+  protected JavaSparkContext getSparkContext() {
+    return super.getSparkContext();
+  }
+
+  @Override
+  protected SparkRuntimeContext getRuntimeContext() {
+    return super.getRuntimeContext();
+  }
+
+  @Override
+  protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+    super.setCurrentTransform(transform);
+  }
+
+  @Override
+  protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    return super.getCurrentTransform();
+  }
+
+  @Override
+  protected <T> void setOutputRDD(PTransform<?, ?> transform,
+      JavaRDDLike<WindowedValue<T>, ?> rdd) {
+    super.setOutputRDD(transform, rdd);
+  }
+
+  @Override
+  protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
+      Coder<T> coder) {
+    super.setOutputRDDFromValues(transform, values, coder);
+  }
+
+  @Override
+  protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
+    return super.hasOutputRDD(transform);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java
new file mode 100644
index 0000000..c78c7fa
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java
@@ -0,0 +1,415 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.api.client.util.Lists;
+import com.google.api.client.util.Maps;
+import com.google.api.client.util.Sets;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.io.AvroIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+import com.google.common.reflect.TypeToken;
+import kafka.serializer.Decoder;
+
+import org.apache.beam.runners.spark.DoFnFunction;
+import org.apache.beam.runners.spark.EvaluationContext;
+import org.apache.beam.runners.spark.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.TransformEvaluator;
+import org.apache.beam.runners.spark.TransformTranslator;
+import org.apache.beam.runners.spark.WindowingHelpers;
+import org.apache.beam.runners.spark.io.ConsoleIO;
+import org.apache.beam.runners.spark.io.CreateStream;
+import org.apache.beam.runners.spark.io.KafkaIO;
+import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
+import org.apache.spark.streaming.api.java.JavaPairInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+
+import scala.Tuple2;
+
+
+/**
+ * Supports translation between a DataFlow transform, and Spark's operations on DStreams.
+ */
+public final class StreamingTransformTranslator {
+
+  private StreamingTransformTranslator() {
+  }
+
+  private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
+    return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() {
+      @Override
+      public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
+        @SuppressWarnings("unchecked")
+        JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
+            (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
+            ((StreamingEvaluationContext) context).getStream(transform);
+        dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
+      }
+    };
+  }
+
+  private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
+    return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
+      @Override
+      public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        JavaStreamingContext jssc = sec.getStreamingContext();
+        Class<K> keyClazz = transform.getKeyClass();
+        Class<V> valueClazz = transform.getValueClass();
+        Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
+        Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass();
+        Map<String, String> kafkaParams = transform.getKafkaParams();
+        Set<String> topics = transform.getTopics();
+        JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz,
+                valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics);
+        JavaDStream<WindowedValue<KV<K, V>>> inputStream =
+            inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() {
+          @Override
+          public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
+            return KV.of(t2._1(), t2._2());
+          }
+        }).map(WindowingHelpers.<KV<K, V>>windowFunction());
+        sec.setStream(transform, inputStream);
+      }
+    };
+  }
+
+  private static <T> TransformEvaluator<Create.Values<T>> create() {
+    return new TransformEvaluator<Create.Values<T>>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void evaluate(Create.Values<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        Iterable<T> elems = transform.getElements();
+        Coder<T> coder = sec.getOutput(transform).getCoder();
+        if (coder != VoidCoder.of()) {
+          // actual create
+          sec.setOutputRDDFromValues(transform, elems, coder);
+        } else {
+          // fake create as an input
+          // creates a stream with a single batch containing a single null element
+          // to invoke following transformations once
+          // to support DataflowAssert
+          sec.setDStreamFromQueue(transform,
+              Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)),
+              (Coder<Void>) coder);
+        }
+      }
+    };
+  }
+
+  private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
+    return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
+      @Override
+      public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        Iterable<Iterable<T>> values = transform.getQueuedValues();
+        Coder<T> coder = sec.getOutput(transform).getCoder();
+        sec.setDStreamFromQueue(transform, values, coder);
+      }
+    };
+  }
+
+  private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
+    return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        PCollectionList<T> pcs = sec.getInput(transform);
+        JavaDStream<WindowedValue<T>> first =
+            (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0));
+        List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1);
+        for (int i = 1; i < pcs.size(); i++) {
+          rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i)));
+        }
+        JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest);
+        sec.setStream(transform, dstream);
+      }
+    };
+  }
+
+  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform(
+      final SparkPipelineTranslator rddTranslator) {
+    return new TransformEvaluator<PT>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void evaluate(PT transform, EvaluationContext context) {
+        TransformEvaluator<PT> rddEvaluator =
+            rddTranslator.translate((Class<PT>) transform.getClass());
+
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        if (sec.hasStream(transform)) {
+          JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
+              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
+              sec.getStream(transform);
+
+          sec.setStream(transform, dStream
+              .transform(new RDDTransform<>(sec, rddEvaluator, transform)));
+        } else {
+          // if the transformation requires direct access to RDD (not in stream)
+          // this is used for "fake" transformations like with DataflowAssert
+          rddEvaluator.evaluate(transform, context);
+        }
+      }
+    };
+  }
+
+  /**
+   * RDD transform function If the transformation function doesn't have an input, create a fake one
+   * as an empty RDD.
+   *
+   * @param <PT> PTransform type
+   */
+  private static final class RDDTransform<PT extends PTransform<?, ?>>
+      implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
+
+    private final StreamingEvaluationContext context;
+    private final AppliedPTransform<?, ?, ?> appliedPTransform;
+    private final TransformEvaluator<PT> rddEvaluator;
+    private final PT transform;
+
+
+    private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator,
+        PT transform) {
+      this.context = context;
+      this.appliedPTransform = context.getCurrentTransform();
+      this.rddEvaluator = rddEvaluator;
+      this.transform = transform;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public JavaRDD<WindowedValue<Object>>
+        call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
+      AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
+      context.setCurrentTransform(appliedPTransform);
+      context.setInputRDD(transform, rdd);
+      rddEvaluator.evaluate(transform, context);
+      if (!context.hasOutputRDD(transform)) {
+        // fake RDD as output
+        context.setOutputRDD(transform,
+            context.getSparkContext().<WindowedValue<Object>>emptyRDD());
+      }
+      JavaRDD<WindowedValue<Object>> outRDD =
+          (JavaRDD<WindowedValue<Object>>) context.getOutputRDD(transform);
+      context.setCurrentTransform(existingAPT);
+      return outRDD;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD(
+      final SparkPipelineTranslator rddTranslator) {
+    return new TransformEvaluator<PT>() {
+      @Override
+      public void evaluate(PT transform, EvaluationContext context) {
+        TransformEvaluator<PT> rddEvaluator =
+            rddTranslator.translate((Class<PT>) transform.getClass());
+
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        if (sec.hasStream(transform)) {
+          JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
+              (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
+              sec.getStream(transform);
+
+          dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform));
+        } else {
+          rddEvaluator.evaluate(transform, context);
+        }
+      }
+    };
+  }
+
+  /**
+   * RDD output function.
+   *
+   * @param <PT> PTransform type
+   */
+  private static final class RDDOutputOperator<PT extends PTransform<?, ?>>
+      implements Function<JavaRDD<WindowedValue<Object>>, Void> {
+
+    private final StreamingEvaluationContext context;
+    private final AppliedPTransform<?, ?, ?> appliedPTransform;
+    private final TransformEvaluator<PT> rddEvaluator;
+    private final PT transform;
+
+
+    private RDDOutputOperator(StreamingEvaluationContext context,
+        TransformEvaluator<PT> rddEvaluator, PT transform) {
+      this.context = context;
+      this.appliedPTransform = context.getCurrentTransform();
+      this.rddEvaluator = rddEvaluator;
+      this.transform = transform;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
+      AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
+      context.setCurrentTransform(appliedPTransform);
+      context.setInputRDD(transform, rdd);
+      rddEvaluator.evaluate(transform, context);
+      context.setCurrentTransform(existingAPT);
+      return null;
+    }
+  }
+
+  private static final TransformTranslator.FieldGetter WINDOW_FG =
+      new TransformTranslator.FieldGetter(Window.Bound.class);
+
+  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
+    return new TransformEvaluator<Window.Bound<T>>() {
+      @Override
+      public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
+        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+        //--- first we apply windowing to the stream
+        WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
+        @SuppressWarnings("unchecked")
+        JavaDStream<WindowedValue<T>> dStream =
+            (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
+        if (windowFn instanceof FixedWindows) {
+          Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
+              .getMillis());
+          sec.setStream(transform, dStream.window(windowDuration));
+        } else if (windowFn instanceof SlidingWindows) {
+          Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
+              .getMillis());
+          Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
+              .getMillis());
+          sec.setStream(transform, dStream.window(windowDuration, slideDuration));
+        }
+        //--- then we apply windowing to the elements
+        DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
+        DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
+            ((StreamingEvaluationContext)context).getRuntimeContext(), null);
+        @SuppressWarnings("unchecked")
+        JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
+            (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
+            sec.getStream(transform);
+        sec.setStream(transform, dstream.mapPartitions(dofn));
+      }
+    };
+  }
+
+  private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
+      .newHashMap();
+
+  static {
+    EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
+    EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
+    EVALUATORS.put(Create.Values.class, create());
+    EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
+    EVALUATORS.put(Window.Bound.class, window());
+    EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+  }
+
+  private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS = Sets
+      .newHashSet();
+
+  static {
+    //TODO - add support for the following
+    UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
+      getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) {
+    TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
+    if (transform == null) {
+      if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
+        throw new UnsupportedOperationException("Dataflow transformation " + clazz
+          .getCanonicalName()
+          + " is currently unsupported by the Spark streaming pipeline");
+      }
+      // DStream transformations will transform an RDD into another RDD
+      // Actions will create output
+      // In Dataflow it depends on the PTransform's Input and Output class
+      Class<?> pTOutputClazz = getPTransformOutputClazz(clazz);
+      if (PDone.class.equals(pTOutputClazz)) {
+        return foreachRDD(rddTranslator);
+      } else {
+        return rddTransform(rddTranslator);
+      }
+    }
+    return transform;
+  }
+
+  private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) {
+    Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments();
+    return TypeToken.of(clazz).resolveType(types[1]).getRawType();
+  }
+
+  /**
+   * Translator matches Dataflow transformation with the appropriate Spark streaming evaluator.
+   * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation
+   */
+  public static class Translator implements SparkPipelineTranslator {
+
+    private final SparkPipelineTranslator rddTranslator;
+
+    public Translator(SparkPipelineTranslator rddTranslator) {
+      this.rddTranslator = rddTranslator;
+    }
+
+    @Override
+    public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
+      // streaming includes rdd transformations as well
+      return EVALUATORS.containsKey(clazz) || rddTranslator.hasTranslation(clazz);
+    }
+
+    @Override
+    public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
+      return getTransformEvaluator(clazz, rddTranslator);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java
new file mode 100644
index 0000000..6844011
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+
+import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.TransformTranslator;
+
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
+
+
+/**
+ * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing.
+ */
+public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator {
+
+  // Currently, Spark streaming recommends batches no smaller then 500 msec
+  private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500);
+
+  private boolean windowing;
+  private Duration batchDuration;
+
+  public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) {
+    super(translator);
+  }
+
+  private static final TransformTranslator.FieldGetter WINDOW_FG =
+      new TransformTranslator.FieldGetter(Window.Bound.class);
+
+  // Use the smallest window (fixed or sliding) as Spark streaming's batch duration
+  @Override
+  protected <PT extends PTransform<? super PInput, POutput>> void
+      doVisitTransform(TransformTreeNode node) {
+    @SuppressWarnings("unchecked")
+    PT transform = (PT) node.getTransform();
+    @SuppressWarnings("unchecked")
+    Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
+    if (transformClass.isAssignableFrom(Window.Bound.class)) {
+      WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
+      if (windowFn instanceof FixedWindows) {
+        setBatchDuration(((FixedWindows) windowFn).getSize());
+      } else if (windowFn instanceof SlidingWindows) {
+        if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) {
+          throw new UnsupportedOperationException("Spark does not support window offsets");
+        }
+        // Sliding window size might as well set the batch duration. Applying the transformation
+        // will add the "slide"
+        setBatchDuration(((SlidingWindows) windowFn).getSize());
+      } else if (!(windowFn instanceof GlobalWindows)) {
+        throw new IllegalStateException("Windowing function not supported: " + windowFn);
+      }
+    }
+  }
+
+  private void setBatchDuration(org.joda.time.Duration duration) {
+    Long durationMillis = duration.getMillis();
+    // validate window size
+    if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) {
+      throw new IllegalArgumentException("Windowing of size " + durationMillis +
+          "msec is not supported!");
+    }
+    // choose the smallest duration to be Spark's batch duration, larger ones will be handled
+    // as window functions  over the batched-stream
+    if (!windowing || this.batchDuration.milliseconds() > durationMillis) {
+      this.batchDuration = Durations.milliseconds(durationMillis);
+    }
+    windowing = true;
+  }
+
+  public boolean isWindowing() {
+    return windowing;
+  }
+
+  public Duration getBatchDuration() {
+    return batchDuration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
new file mode 100644
index 0000000..af831c6
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BroadcastHelper<T> implements Serializable {
+
+  /**
+   * If the property {@code dataflow.spark.directBroadcast} is set to
+   * {@code true} then Spark serialization (Kryo) will be used to broadcast values
+   * in View objects. By default this property is not set, and values are coded using
+   * the appropriate {@link Coder}.
+   */
+  public static final String DIRECT_BROADCAST = "dataflow.spark.directBroadcast";
+
+  private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);
+
+  public static <T> BroadcastHelper<T> create(T value, Coder<T> coder) {
+    if (Boolean.parseBoolean(System.getProperty(DIRECT_BROADCAST, "false"))) {
+      return new DirectBroadcastHelper<>(value);
+    }
+    return new CodedBroadcastHelper<>(value, coder);
+  }
+
+  public abstract T getValue();
+
+  public abstract void broadcast(JavaSparkContext jsc);
+
+  /**
+   * A {@link BroadcastHelper} that relies on the underlying
+   * Spark serialization (Kryo) to broadcast values. This is appropriate when
+   * broadcasting very large values, since no copy of the object is made.
+   * @param <T>
+   */
+  static class DirectBroadcastHelper<T> extends BroadcastHelper<T> {
+    private Broadcast<T> bcast;
+    private transient T value;
+
+    DirectBroadcastHelper(T value) {
+      this.value = value;
+    }
+
+    @Override
+    public synchronized T getValue() {
+      if (value == null) {
+        value = bcast.getValue();
+      }
+      return value;
+    }
+
+    @Override
+    public void broadcast(JavaSparkContext jsc) {
+      this.bcast = jsc.broadcast(value);
+    }
+  }
+
+  /**
+   * A {@link BroadcastHelper} that uses a
+   * {@link Coder} to encode values as byte arrays
+   * before broadcasting.
+   * @param <T>
+   */
+  static class CodedBroadcastHelper<T> extends BroadcastHelper<T> {
+    private Broadcast<byte[]> bcast;
+    private final Coder<T> coder;
+    private transient T value;
+
+    CodedBroadcastHelper(T value, Coder<T> coder) {
+      this.value = value;
+      this.coder = coder;
+    }
+
+    @Override
+    public synchronized T getValue() {
+      if (value == null) {
+        value = deserialize();
+      }
+      return value;
+    }
+
+    @Override
+    public void broadcast(JavaSparkContext jsc) {
+      this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder));
+    }
+
+    private T deserialize() {
+      T val;
+      try {
+        val = coder.decode(new ByteArrayInputStream(bcast.value()),
+            new Coder.Context(true));
+      } catch (IOException ioe) {
+        // this should not ever happen, log it if it does.
+        LOG.warn(ioe.getMessage());
+        val = null;
+      }
+      return val;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
new file mode 100644
index 0000000..7679b9c
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import com.google.common.primitives.UnsignedBytes;
+
+public class ByteArray implements Serializable, Comparable<ByteArray> {
+
+  private final byte[] value;
+
+  public ByteArray(byte[] value) {
+    this.value = value;
+  }
+
+  public byte[] getValue() {
+    return value;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ByteArray byteArray = (ByteArray) o;
+    return Arrays.equals(value, byteArray.value);
+  }
+
+  @Override
+  public int hashCode() {
+    return value != null ? Arrays.hashCode(value) : 0;
+  }
+
+  @Override
+  public int compareTo(ByteArray other) {
+    return UnsignedBytes.lexicographicalComparator().compare(value, other.value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
index 5733a86..98387a6 100644
--- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
+++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-com.cloudera.dataflow.spark.SparkPipelineOptionsRegistrar
-com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptionsRegistrar
\ No newline at end of file
+org.apache.beam.runners.spark.SparkPipelineOptionsRegistrar
+org.apache.beam.runners.spark.streaming.SparkStreamingPipelineOptionsRegistrar
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
index 26e0b3a..972b1a3 100644
--- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
+++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-com.cloudera.dataflow.spark.SparkPipelineRunnerRegistrar
\ No newline at end of file
+org.apache.beam.runners.spark.SparkPipelineRunnerRegistrar
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java
deleted file mode 100644
index 29a73b6..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.hadoop;
-
-import com.google.cloud.dataflow.sdk.testing.CoderProperties;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-/**
- * Tests for WritableCoder.
- */
-public class WritableCoderTest {
-
-  @Test
-  public void testIntWritableEncoding() throws Exception {
-    IntWritable value = new IntWritable(42);
-    WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
-
-    CoderProperties.coderDecodeEncodeEqual(coder, value);
-  }
-
-  @Test
-  public void testNullWritableEncoding() throws Exception {
-    WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
-
-    CoderProperties.coderDecodeEncodeEqual(coder, NullWritable.get());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java
deleted file mode 100644
index ea4cc38..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.Lists;
-import com.google.common.io.Resources;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.List;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import static org.junit.Assert.assertEquals;
-
-public class AvroPipelineTest {
-
-  private File inputFile;
-  private File outputDir;
-
-  @Rule
-  public final TemporaryFolder tmpDir = new TemporaryFolder();
-
-  @Before
-  public void setUp() throws IOException {
-    inputFile = tmpDir.newFile("test.avro");
-    outputDir = tmpDir.newFolder("out");
-    outputDir.delete();
-  }
-
-  @Test
-  public void testGeneric() throws Exception {
-    Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream());
-    GenericRecord savedRecord = new GenericData.Record(schema);
-    savedRecord.put("name", "John Doe");
-    savedRecord.put("age", 42);
-    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-    populateGenericFile(Lists.newArrayList(savedRecord), schema);
-
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-    PCollection<GenericRecord> input = p.apply(
-        AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
-    input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
-    res.close();
-
-    List<GenericRecord> records = readGenericFile();
-    assertEquals(Lists.newArrayList(savedRecord), records);
-  }
-
-  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
-    FileOutputStream outputStream = new FileOutputStream(this.inputFile);
-    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<>(schema);
-
-    try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(genericDatumWriter)) {
-      dataFileWriter.create(schema, outputStream);
-      for (GenericRecord record : genericRecords) {
-        dataFileWriter.append(record);
-      }
-    }
-    outputStream.close();
-  }
-
-  private List<GenericRecord> readGenericFile() throws IOException {
-    List<GenericRecord> records = Lists.newArrayList();
-    GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>();
-    try (DataFileReader<GenericRecord> dataFileReader =
-             new DataFileReader<>(new File(outputDir + "-00000-of-00001"), genericDatumReader)) {
-      for (GenericRecord record : dataFileReader) {
-        records.add(record);
-      }
-    }
-    return records;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
deleted file mode 100644
index 667e949..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.Iterables;
-import java.util.Arrays;
-import java.util.List;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class CombineGloballyTest {
-
-  private static final String[] WORDS_ARRAY = {
-      "hi there", "hi", "hi sue bob",
-      "hi sue", "", "bob hi"};
-  private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-
-  @Test
-  public void test() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
-    PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
-
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
-    assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output)));
-    res.close();
-  }
-
-  public static class WordMerger extends Combine.CombineFn<String, StringBuilder, String> {
-
-    @Override
-    public StringBuilder createAccumulator() {
-      // return null to differentiate from an empty string
-      return null;
-    }
-
-    @Override
-    public StringBuilder addInput(StringBuilder accumulator, String input) {
-      return combine(accumulator, input);
-    }
-
-    @Override
-    public StringBuilder mergeAccumulators(Iterable<StringBuilder> accumulators) {
-      StringBuilder sb = new StringBuilder();
-      for (StringBuilder accum : accumulators) {
-        if (accum != null) {
-          sb.append(accum);
-        }
-      }
-      return sb;
-    }
-
-    @Override
-    public String extractOutput(StringBuilder accumulator) {
-      return accumulator.toString();
-    }
-
-    private static StringBuilder combine(StringBuilder accum, String datum) {
-      if (accum == null) {
-        return new StringBuilder(datum);
-      } else {
-        accum.append(",").append(datum);
-        return accum;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java
deleted file mode 100644
index f9d5b46..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.ImmutableList;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class CombinePerKeyTest {
-
-    private static final List<String> WORDS =
-        ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog");
-    @Test
-    public void testRun() {
-        Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-        PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
-        PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
-        EvaluationResult res = SparkPipelineRunner.create().run(p);
-        Map<String, Long> actualCnts = new HashMap<>();
-        for (KV<String, Long> kv : res.get(cnts)) {
-            actualCnts.put(kv.getKey(), kv.getValue());
-        }
-        res.close();
-        Assert.assertEquals(8, actualCnts.size());
-        Assert.assertEquals(Long.valueOf(2L), actualCnts.get("the"));
-    }
-
-    private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
-      @Override
-      public PCollection<KV<T, Long>> apply(PCollection<T> pcol) {
-          PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() {
-              @Override
-              public void processElement(ProcessContext processContext) throws Exception {
-                  processContext.output(KV.of(processContext.element(), 1L));
-              }
-          })).setCoder(KvCoder.of(pcol.getCoder(), VarLongCoder.of()));
-          return withLongs.apply(Sum.<T>longsPerKey());
-      }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java
deleted file mode 100644
index 7495aeb..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.ImmutableSet;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import org.junit.Test;
-
-/**
- * A test based on {@code DeDupExample} from the SDK.
- */
-public class DeDupTest {
-
-  private static final String[] LINES_ARRAY = {
-      "hi there", "hello", "hi there",
-      "hi", "hello"};
-  private static final List<String> LINES = Arrays.asList(LINES_ARRAY);
-  private static final Set<String> EXPECTED_SET =
-      ImmutableSet.of("hi there", "hi", "hello");
-
-  @Test
-  public void testRun() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    options.setRunner(SparkPipelineRunner.class);
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> input = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());
-    PCollection<String> output = input.apply(RemoveDuplicates.<String>create());
-
-    DataflowAssert.that(output).containsInAnyOrder(EXPECTED_SET);
-
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
-    res.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java
deleted file mode 100644
index 2b0947f..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import java.io.Serializable;
-import org.junit.Test;
-
-public class DoFnOutputTest implements Serializable {
-  @Test
-  public void test() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    options.setRunner(SparkPipelineRunner.class);
-    Pipeline pipeline = Pipeline.create(options);
-
-    PCollection<String> strings = pipeline.apply(Create.of("a"));
-    // Test that values written from startBundle() and finishBundle() are written to
-    // the output
-    PCollection<String> output = strings.apply(ParDo.of(new DoFn<String, String>() {
-      @Override
-      public void startBundle(Context c) throws Exception {
-        c.output("start");
-      }
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        c.output(c.element());
-      }
-      @Override
-      public void finishBundle(Context c) throws Exception {
-        c.output("finish");
-      }
-    }));
-
-    DataflowAssert.that(output).containsInAnyOrder("start", "a", "finish");
-
-    EvaluationResult res = SparkPipelineRunner.create().run(pipeline);
-    res.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java
deleted file mode 100644
index 6c89ca1..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.Iterables;
-import java.util.Collections;
-import java.util.List;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class EmptyInputTest {
-
-  @Test
-  public void test() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    Pipeline p = Pipeline.create(options);
-    List<String> empty = Collections.emptyList();
-    PCollection<String> inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of());
-    PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
-
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
-    assertEquals("", Iterables.getOnlyElement(res.get(output)));
-    res.close();
-  }
-
-  public static class ConcatWords implements SerializableFunction<Iterable<String>, String> {
-    @Override
-    public String apply(Iterable<String> input) {
-      StringBuilder all = new StringBuilder();
-      for (String item : input) {
-        if (!item.isEmpty()) {
-          if (all.length() == 0) {
-            all.append(item);
-          } else {
-            all.append(",");
-            all.append(item);
-          }
-        }
-      }
-      return all.toString();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
deleted file mode 100644
index 579ada5..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.cloudera.dataflow.hadoop.HadoopIO;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import java.io.File;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import static org.junit.Assert.assertEquals;
-
-public class HadoopFileFormatPipelineTest {
-
-  private File inputFile;
-  private File outputFile;
-
-  @Rule
-  public final TemporaryFolder tmpDir = new TemporaryFolder();
-
-  @Before
-  public void setUp() throws IOException {
-    inputFile = tmpDir.newFile("test.seq");
-    outputFile = tmpDir.newFolder("out");
-    outputFile.delete();
-  }
-
-  @Test
-  public void testSequenceFile() throws Exception {
-    populateFile();
-
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-    @SuppressWarnings("unchecked")
-    Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
-        (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class;
-    HadoopIO.Read.Bound<IntWritable,Text> read =
-        HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class);
-    PCollection<KV<IntWritable, Text>> input = p.apply(read);
-    @SuppressWarnings("unchecked")
-    Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass =
-        (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) TemplatedSequenceFileOutputFormat.class;
-    @SuppressWarnings("unchecked")
-    HadoopIO.Write.Bound<IntWritable,Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
-        outputFormatClass, IntWritable.class, Text.class);
-    input.apply(write.withoutSharding());
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
-    res.close();
-
-    IntWritable key = new IntWritable();
-    Text value = new Text();
-    try (Reader reader = new Reader(new Configuration(), Reader.file(new Path(outputFile.toURI())))) {
-      int i = 0;
-      while (reader.next(key, value)) {
-        assertEquals(i, key.get());
-        assertEquals("value-" + i, value.toString());
-        i++;
-      }
-    }
-  }
-
-  private void populateFile() throws IOException {
-    IntWritable key = new IntWritable();
-    Text value = new Text();
-    try (Writer writer = SequenceFile.createWriter(
-        new Configuration(),
-        Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
-        Writer.file(new Path(this.inputFile.toURI())))) {
-      for (int i = 0; i < 5; i++) {
-        key.set(i);
-        value.set("value-" + i);
-        writer.append(key, value);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
deleted file mode 100644
index 2df8493..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.ApproximateUnique;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TupleTagList;
-import com.google.common.collect.Iterables;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class MultiOutputWordCountTest {
-
-  private static final TupleTag<String> upper = new TupleTag<>();
-  private static final TupleTag<String> lower = new TupleTag<>();
-  private static final TupleTag<KV<String, Long>> lowerCnts = new TupleTag<>();
-  private static final TupleTag<KV<String, Long>> upperCnts = new TupleTag<>();
-
-  @Test
-  public void testRun() throws Exception {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
-    PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+"));
-    PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others"));
-    PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words"));
-    PCollectionList<String> list = PCollectionList.of(w1).and(w2);
-
-    PCollection<String> union = list.apply(Flatten.<String>pCollections());
-    PCollectionView<String> regexView = regex.apply(View.<String>asSingleton());
-    CountWords countWords = new CountWords(regexView);
-    PCollectionTuple luc = union.apply(countWords);
-    PCollection<Long> unique = luc.get(lowerCnts).apply(
-        ApproximateUnique.<KV<String, Long>>globally(16));
-
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
-    Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts));
-    Assert.assertEquals("are", actualLower.iterator().next().getKey());
-    Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
-    Assert.assertEquals("Here", actualUpper.iterator().next().getKey());
-    Iterable<Long> actualUniqCount = res.get(unique);
-    Assert.assertEquals(9, (long) actualUniqCount.iterator().next());
-    int actualTotalWords = res.getAggregatorValue("totalWords", Integer.class);
-    Assert.assertEquals(18, actualTotalWords);
-    int actualMaxWordLength = res.getAggregatorValue("maxWordLength", Integer.class);
-    Assert.assertEquals(6, actualMaxWordLength);
-    AggregatorValues<Integer> aggregatorValues = res.getAggregatorValues(countWords
-        .getTotalWordsAggregator());
-    Assert.assertEquals(18, Iterables.getOnlyElement(aggregatorValues.getValues()).intValue());
-
-    res.close();
-  }
-
-  /**
-   * A DoFn that tokenizes lines of text into individual words.
-   */
-  static class ExtractWordsFn extends DoFn<String, String> {
-
-    private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords",
-        new Sum.SumIntegerFn());
-    private final Aggregator<Integer, Integer> maxWordLength = createAggregator("maxWordLength",
-        new Max.MaxIntegerFn());
-    private final PCollectionView<String> regex;
-
-    ExtractWordsFn(PCollectionView<String> regex) {
-      this.regex = regex;
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      String[] words = c.element().split(c.sideInput(regex));
-      for (String word : words) {
-        totalWords.addValue(1);
-        if (!word.isEmpty()) {
-          maxWordLength.addValue(word.length());
-          if (Character.isLowerCase(word.charAt(0))) {
-            c.output(word);
-          } else {
-            c.sideOutput(upper, word);
-          }
-        }
-      }
-    }
-  }
-
-  public static class CountWords extends PTransform<PCollection<String>, PCollectionTuple> {
-
-    private final PCollectionView<String> regex;
-    private final ExtractWordsFn extractWordsFn;
-
-    public CountWords(PCollectionView<String> regex) {
-      this.regex = regex;
-      this.extractWordsFn = new ExtractWordsFn(regex);
-    }
-
-    @Override
-    public PCollectionTuple apply(PCollection<String> lines) {
-      // Convert lines of text into individual words.
-      PCollectionTuple lowerUpper = lines
-          .apply(ParDo.of(extractWordsFn)
-              .withSideInputs(regex)
-              .withOutputTags(lower, TupleTagList.of(upper)));
-      lowerUpper.get(lower).setCoder(StringUtf8Coder.of());
-      lowerUpper.get(upper).setCoder(StringUtf8Coder.of());
-      PCollection<KV<String, Long>> lowerCounts = lowerUpper.get(lower).apply(Count
-          .<String>perElement());
-      PCollection<KV<String, Long>> upperCounts = lowerUpper.get(upper).apply(Count
-          .<String>perElement());
-      return PCollectionTuple
-          .of(lowerCnts, lowerCounts)
-          .and(upperCnts, upperCounts);
-    }
-
-    Aggregator<Integer, Integer> getTotalWordsAggregator() {
-      return extractWordsFn.totalWords;
-    }
-  }
-}