You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:59:10 UTC

[45/50] [abbrv] incubator-beam git commit: Add spark-streaming support to spark-dataflow

Add spark-streaming support to spark-dataflow

Add support for application name and streaming (default: false)

Add pipeline options for streaming

Add print output as an unbounded write

Add default window strategy to represent Spark streaming micro-batches as fixed windows

This translator helps to translate Dataflow transformations into Spark (+streaming) transformations. This will help to support streaming transformations separately

Expose through the SparkPipelineTranslator

Now Evaluator uses SparkPipelineTranslator to translate

Add default application name

StreamingEvaluation context to support DStream evaluation. Expose members and methods in EvaluationContext
for inheritors

Use configured app name in options

A TransformTranslator for streaming

Add support for spark streaming execution in the runner

Fix comment

Create input stream from a queue - mainly for testing I guess

Add support to create input stream from queued values

Override method to expose in package

Test WordCount in streaming, just print out for now..

Stream print to console is a transformations of PCollection to PDone

rename to CreateStream to differ from Dataflow Create

It seems that in 1.3.1 short living streaming jobs fail (like unit tests). Maybe has something to do with SPARK-7930. fixed in 1.4.0 so bumped up.

Expose some methods, add a method to check if RDDHolder exists

make context final

Streaming default should be local[1] to suppport unit tests

No need for recurring context. Exposing additional parent methods. Added RUNNING state when stream is running.

WordCount test runs 1 (sec) interval and compares to expected like in batch.

Void Create triggers a no-input transformation

transformations and output operations can be applied on streams/bounded collections in the pipeline

foreachRDD is used for PDone transformation

Commments

SocketIO to consume stream from socket

Comment

Add support for Kafka input

Comments and some patching-up

Default is the same as in SparkPipelineOptions

Adding licenses

To satisfy license Javadoc and codestyle

Satisfy license Javadoc and codestyle

Check for DataflowAssertFailure because it won't propagate

Since DataflowAssert doesn't propagate failures in streaming, use Aggregators to assert

Use DataflowAssertStreaming

Add kafka translation

Embedded Kafka for unit test

Kafka unit test

import order

license

WindowingHelpers by Tom White @tomwhite

Combine @tomwhite windowing branch into mine - values are windowed values now

values are windowed values now

Input is UNBOUNDED now

Using windowing instead

batchInterval to be determined by pipeline runner

print the value not the windowed value

remove support for for optimizations. for now.

batchInterval is determined by the pipeline runner now

Add streaming window pipeline visitor to determine windowing

Add windowing support in streaming unit tests

Combine.Globally is necessary so leave it

fix line length

renames

Add implementation for GroupAlsoByWindow which helps to solve broken grouped/combinePerKey

Line indentation

unused

codestyle

Expose runtimeContext

Make public

Use the smallest window found (fixed/sliding) as the batch duration

Make FieldGetter public

Add support for windowing

codestyle

unused

Update Spark to 1.5, kafka dependency should be provided

Abstract Evaluator for common evaluator code. doVisitTransform per implementation.

Added non-streaming windowing test by Tom White @tomwhite

Fixed Combine.GroupedValues and Combine.Globally to work with WindowedValues without losing window
properties. For now, Combine.PerKey is commented out until fixed to fully support WindowedValues.

Support WindowedValues, Global or not, in Combine.PerKey

After changes made to Combine.PerKey in 3a46150 it seems that the order has changed. Since ordere
didn't seem relevant before the change, I don't see a reason not to change the expected value
accordingly.

Update Spark version to 1.5.2


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7a2e9a72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7a2e9a72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7a2e9a72

Branch: refs/heads/master
Commit: 7a2e9a72b0507efc6cb34c342a8d0983d63f1491
Parents: f930380
Author: Amit Sela <am...@gmail.com>
Authored: Thu Oct 22 17:41:54 2015 +0300
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:17 2016 +0000

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  20 +-
 .../com/cloudera/dataflow/io/ConsoleIO.java     |  60 +++
 .../com/cloudera/dataflow/io/CreateStream.java  |  66 +++
 .../java/com/cloudera/dataflow/io/KafkaIO.java  | 128 ++++++
 .../cloudera/dataflow/spark/DoFnFunction.java   |  21 +-
 .../dataflow/spark/EvaluationContext.java       |  78 +++-
 .../dataflow/spark/MultiDoFnFunction.java       |  31 +-
 .../dataflow/spark/SparkContextFactory.java     |  10 +-
 .../dataflow/spark/SparkPipelineEvaluator.java  |  52 +++
 .../dataflow/spark/SparkPipelineOptions.java    |  13 +-
 .../dataflow/spark/SparkPipelineRunner.java     | 108 +++--
 .../dataflow/spark/SparkPipelineTranslator.java |  27 ++
 .../dataflow/spark/SparkProcessContext.java     |  43 +-
 .../dataflow/spark/TransformTranslator.java     | 274 ++++++++-----
 .../dataflow/spark/WindowingHelpers.java        |  59 +++
 .../SparkStreamingPipelineOptions.java          |  40 ++
 .../SparkStreamingPipelineOptionsFactory.java   |  27 ++
 .../SparkStreamingPipelineOptionsRegistrar.java |  28 ++
 .../streaming/StreamingEvaluationContext.java   | 219 ++++++++++
 .../streaming/StreamingTransformTranslator.java | 409 +++++++++++++++++++
 .../StreamingWindowPipelineDetector.java        |  99 +++++
 ...ataflow.sdk.options.PipelineOptionsRegistrar |   3 +-
 .../spark/MultiOutputWordCountTest.java         |   2 +-
 .../dataflow/spark/SideEffectsTest.java         |   2 +-
 .../dataflow/spark/SimpleWordCountTest.java     |   2 +-
 .../dataflow/spark/WindowedWordCountTest.java   |  63 +++
 .../spark/streaming/KafkaStreamingTest.java     | 133 ++++++
 .../streaming/SimpleStreamingWordCountTest.java |  75 ++++
 .../utils/DataflowAssertStreaming.java          |  39 ++
 .../streaming/utils/EmbeddedKafkaCluster.java   | 315 ++++++++++++++
 30 files changed, 2255 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index df21c43..5beb1c7 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -22,7 +22,7 @@ License.
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <java.version>1.7</java.version>
-        <spark.version>1.3.1</spark.version>
+        <spark.version>1.5.2</spark.version>
         <google-cloud-dataflow-version>1.0.0</google-cloud-dataflow-version>
     </properties>
 
@@ -282,6 +282,24 @@ License.
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming-kafka_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.8.2.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>18.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java
new file mode 100644
index 0000000..bc19b39
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.io;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+/**
+ * Print to console.
+ */
+public final class ConsoleIO {
+
+  private ConsoleIO() {
+  }
+
+  public static final class Write {
+
+    private Write() {
+    }
+
+    public static <T> Unbound<T> from() {
+      return new Unbound<>(10);
+    }
+
+    public static <T> Unbound<T> from(int num) {
+      return new Unbound<>(num);
+    }
+
+    public static class Unbound<T> extends PTransform<PCollection<T>, PDone> {
+
+      private final int num;
+
+      Unbound(int num) {
+        this.num = num;
+      }
+
+      public int getNum() {
+        return num;
+      }
+
+      @Override
+      public PDone apply(PCollection<T> input) {
+        return PDone.in(input.getPipeline());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java b/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java
new file mode 100644
index 0000000..9a99278
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.io;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.common.base.Preconditions;
+
+/**
+ * Create an input stream from Queue.
+ *
+ * @param <T> stream type
+ */
+public final class CreateStream<T> {
+
+  private CreateStream() {
+  }
+
+  /**
+   * Define the input stream to create from queue.
+   *
+   * @param queuedValues  defines the input stream
+   * @param <T>           stream type
+   * @return the queue that defines the input stream
+   */
+  public static <T> QueuedValues<T> fromQueue(Iterable<Iterable<T>> queuedValues) {
+    return new QueuedValues<>(queuedValues);
+  }
+
+  public static final class QueuedValues<T> extends PTransform<PInput, PCollection<T>> {
+
+    private final Iterable<Iterable<T>> queuedValues;
+
+    QueuedValues(Iterable<Iterable<T>> queuedValues) {
+      Preconditions.checkNotNull(queuedValues,
+              "need to set the queuedValues of an Create.QueuedValues transform");
+      this.queuedValues = queuedValues;
+    }
+
+    public Iterable<Iterable<T>> getQueuedValues() {
+      return queuedValues;
+    }
+
+    @Override
+    public PCollection<T> apply(PInput input) {
+      // Spark streaming micro batches are bounded by default
+      return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+          WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java
new file mode 100644
index 0000000..154e6da
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.io;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.common.base.Preconditions;
+
+import kafka.serializer.Decoder;
+
+/**
+ * Read stream from Kafka.
+ */
+public final class KafkaIO {
+
+  private KafkaIO() {
+  }
+
+  public static final class Read {
+
+    private Read() {
+    }
+
+    /**
+     * Define the Kafka consumption.
+     *
+     * @param keyDecoder    {@link Decoder} to decode the Kafka message key
+     * @param valueDecoder  {@link Decoder} to decode the Kafka message value
+     * @param key           Kafka message key Class
+     * @param value         Kafka message value Class
+     * @param topics        Kafka topics to subscribe
+     * @param kafkaParams   map of Kafka parameters
+     * @param <K>           Kafka message key Class type
+     * @param <V>           Kafka message value Class type
+     * @return KafkaIO Unbound input
+     */
+    public static <K, V> Unbound<K, V> from(Class<? extends Decoder<K>> keyDecoder,
+                                            Class<? extends Decoder<V>> valueDecoder,
+                                            Class<K> key,
+                                            Class<V> value, Set<String> topics,
+                                            Map<String, String> kafkaParams) {
+      return new Unbound<>(keyDecoder, valueDecoder, key, value, topics, kafkaParams);
+    }
+
+    public static class Unbound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> {
+
+      private final Class<? extends Decoder<K>> keyDecoderClass;
+      private final Class<? extends Decoder<V>> valueDecoderClass;
+      private final Class<K> keyClass;
+      private final Class<V> valueClass;
+      private final Set<String> topics;
+      private final Map<String, String> kafkaParams;
+
+      Unbound(Class<? extends Decoder<K>> keyDecoder,
+              Class<? extends Decoder<V>> valueDecoder, Class<K> key,
+              Class<V> value, Set<String> topics, Map<String, String> kafkaParams) {
+        Preconditions.checkNotNull(keyDecoder,
+            "need to set the key decoder class of a KafkaIO.Read transform");
+        Preconditions.checkNotNull(valueDecoder,
+            "need to set the value decoder class of a KafkaIO.Read transform");
+        Preconditions.checkNotNull(key,
+            "need to set the key class of aKafkaIO.Read transform");
+        Preconditions.checkNotNull(value,
+            "need to set the value class of a KafkaIO.Read transform");
+        Preconditions.checkNotNull(topics,
+            "need to set the topics of a KafkaIO.Read transform");
+        Preconditions.checkNotNull(kafkaParams,
+            "need to set the kafkaParams of a KafkaIO.Read transform");
+        this.keyDecoderClass = keyDecoder;
+        this.valueDecoderClass = valueDecoder;
+        this.keyClass = key;
+        this.valueClass = value;
+        this.topics = topics;
+        this.kafkaParams = kafkaParams;
+      }
+
+      public Class<? extends Decoder<K>> getKeyDecoderClass() {
+        return keyDecoderClass;
+      }
+
+      public Class<? extends Decoder<V>> getValueDecoderClass() {
+        return valueDecoderClass;
+      }
+
+      public Class<V> getValueClass() {
+        return valueClass;
+      }
+
+      public Class<K> getKeyClass() {
+        return keyClass;
+      }
+
+      public Set<String> getTopics() {
+        return topics;
+      }
+
+      public Map<String, String> getKafkaParams() {
+        return kafkaParams;
+      }
+
+      @Override
+      public PCollection<KV<K, V>> apply(PInput input) {
+        // Spark streaming micro batches are bounded by default
+        return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+            WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
index 542f2ec..6617c56 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
 import org.apache.spark.api.java.function.FlatMapFunction;
 
@@ -30,7 +31,8 @@ import org.apache.spark.api.java.function.FlatMapFunction;
  * @param <I> Input element type.
  * @param <O> Output element type.
  */
-class DoFnFunction<I, O> implements FlatMapFunction<Iterator<I>, O> {
+public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValue<I>>,
+    WindowedValue<O>> {
   private final DoFn<I, O> mFunction;
   private final SparkRuntimeContext mRuntimeContext;
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
@@ -40,7 +42,7 @@ class DoFnFunction<I, O> implements FlatMapFunction<Iterator<I>, O> {
    * @param runtime    Runtime to apply function in.
    * @param sideInputs Side inputs used in DoFunction.
    */
-  DoFnFunction(DoFn<I, O> fn,
+  public DoFnFunction(DoFn<I, O> fn,
                SparkRuntimeContext runtime,
                Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
     this.mFunction = fn;
@@ -49,16 +51,17 @@ class DoFnFunction<I, O> implements FlatMapFunction<Iterator<I>, O> {
   }
 
   @Override
-  public Iterable<O> call(Iterator<I> iter) throws Exception {
+  public Iterable<WindowedValue<O>> call(Iterator<WindowedValue<I>> iter) throws
+      Exception {
     ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
     ctxt.setup();
     mFunction.startBundle(ctxt);
     return ctxt.getOutputIterable(iter, mFunction);
   }
 
-  private class ProcCtxt extends SparkProcessContext<I, O, O> {
+  private class ProcCtxt extends SparkProcessContext<I, O, WindowedValue<O>> {
 
-    private final List<O> outputs = new LinkedList<>();
+    private final List<WindowedValue<O>> outputs = new LinkedList<>();
 
     ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
         BroadcastHelper<?>> sideInputs) {
@@ -67,6 +70,12 @@ class DoFnFunction<I, O> implements FlatMapFunction<Iterator<I>, O> {
 
     @Override
     public synchronized void output(O o) {
+      outputs.add(windowedValue != null ? windowedValue.withValue(o) :
+          WindowedValue.valueInEmptyWindows(o));
+    }
+
+    @Override
+    public synchronized void output(WindowedValue<O> o) {
       outputs.add(o);
     }
 
@@ -75,7 +84,7 @@ class DoFnFunction<I, O> implements FlatMapFunction<Iterator<I>, O> {
       outputs.clear();
     }
 
-    protected Iterator<O> getOutputIterator() {
+    protected Iterator<WindowedValue<O>> getOutputIterator() {
       return outputs.iterator();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
index eb9554f..68e9d27 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
@@ -55,8 +55,8 @@ public class EvaluationContext implements EvaluationResult {
   private final Set<RDDHolder<?>> leafRdds = new LinkedHashSet<>();
   private final Set<PValue> multireads = new LinkedHashSet<>();
   private final Map<PValue, Object> pobjects = new LinkedHashMap<>();
-  private final Map<PValue, Iterable<WindowedValue<?>>> pview = new LinkedHashMap<>();
-  private AppliedPTransform<?, ?, ?> currentTransform;
+  private final Map<PValue, Iterable<? extends WindowedValue<?>>> pview = new LinkedHashMap<>();
+  protected AppliedPTransform<?, ?, ?> currentTransform;
 
   public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
     this.jsc = jsc;
@@ -76,21 +76,31 @@ public class EvaluationContext implements EvaluationResult {
 
     private Iterable<T> values;
     private Coder<T> coder;
-    private JavaRDDLike<T, ?> rdd;
+    private JavaRDDLike<WindowedValue<T>, ?> rdd;
 
     RDDHolder(Iterable<T> values, Coder<T> coder) {
       this.values = values;
       this.coder = coder;
     }
 
-    RDDHolder(JavaRDDLike<T, ?> rdd) {
+    RDDHolder(JavaRDDLike<WindowedValue<T>, ?> rdd) {
       this.rdd = rdd;
     }
 
-    public JavaRDDLike<T, ?> getRDD() {
+    public JavaRDDLike<WindowedValue<T>, ?> getRDD() {
       if (rdd == null) {
-        rdd = jsc.parallelize(CoderHelpers.toByteArrays(values, coder))
-            .map(CoderHelpers.fromByteFunction(coder));
+        Iterable<WindowedValue<T>> windowedValues = Iterables.transform(values,
+            new Function<T, WindowedValue<T>>() {
+            @Override
+            public WindowedValue<T> apply(T t) {
+             // TODO: this is wrong if T is a TimestampedValue
+              return WindowedValue.valueInEmptyWindows(t);
+            }
+        });
+        WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
+            WindowedValue.getValueOnlyCoder(coder);
+        rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
       }
       return rdd;
     }
@@ -98,7 +108,8 @@ public class EvaluationContext implements EvaluationResult {
     public Iterable<T> getValues(PCollection<T> pcollection) {
       if (values == null) {
         coder = pcollection.getCoder();
-        JavaRDDLike<byte[], ?> bytesRDD = rdd.map(CoderHelpers.toByteFunction(coder));
+        JavaRDDLike<byte[], ?> bytesRDD = rdd.map(WindowingHelpers.<T>unwindowFunction())
+            .map(CoderHelpers.toByteFunction(coder));
         List<byte[]> clientBytes = bytesRDD.collect();
         values = Iterables.transform(clientBytes, new Function<byte[], T>() {
           @Override
@@ -109,25 +120,38 @@ public class EvaluationContext implements EvaluationResult {
       }
       return values;
     }
+
+    public Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
+      return Iterables.transform(get(pcollection), new Function<T, WindowedValue<T>>() {
+        @Override
+        public WindowedValue<T> apply(T t) {
+          return WindowedValue.valueInEmptyWindows(t); // TODO: not the right place?
+        }
+      });
+    }
   }
 
-  JavaSparkContext getSparkContext() {
+  protected JavaSparkContext getSparkContext() {
     return jsc;
   }
 
-  Pipeline getPipeline() {
+  protected Pipeline getPipeline() {
     return pipeline;
   }
 
-  SparkRuntimeContext getRuntimeContext() {
+  protected SparkRuntimeContext getRuntimeContext() {
     return runtime;
   }
 
-  void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+  protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
     this.currentTransform = transform;
   }
 
-  <I extends PInput> I getInput(PTransform<I, ?> transform) {
+  protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    return currentTransform;
+  }
+
+  protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
     checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
         "can only be called with current transform");
     @SuppressWarnings("unchecked")
@@ -135,7 +159,7 @@ public class EvaluationContext implements EvaluationResult {
     return input;
   }
 
-  <O extends POutput> O getOutput(PTransform<?, O> transform) {
+  protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
     checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
         "can only be called with current transform");
     @SuppressWarnings("unchecked")
@@ -143,20 +167,26 @@ public class EvaluationContext implements EvaluationResult {
     return output;
   }
 
-  <T> void setOutputRDD(PTransform<?, ?> transform, JavaRDDLike<T, ?> rdd) {
+  protected  <T> void setOutputRDD(PTransform<?, ?> transform,
+      JavaRDDLike<WindowedValue<T>, ?> rdd) {
     setRDD((PValue) getOutput(transform), rdd);
   }
 
-  <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
+  protected  <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
       Coder<T> coder) {
     pcollections.put((PValue) getOutput(transform), new RDDHolder<>(values, coder));
   }
 
-  void setPView(PValue view, Iterable<WindowedValue<?>> value) {
+  void setPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
     pview.put(view, value);
   }
 
-  JavaRDDLike<?, ?> getRDD(PValue pvalue) {
+  protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
+    PValue pvalue = (PValue) getOutput(transform);
+    return pcollections.containsKey(pvalue);
+  }
+
+  protected JavaRDDLike<?, ?> getRDD(PValue pvalue) {
     RDDHolder<?> rddHolder = pcollections.get(pvalue);
     JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
     leafRdds.remove(rddHolder);
@@ -169,7 +199,7 @@ public class EvaluationContext implements EvaluationResult {
     return rdd;
   }
 
-  <T> void setRDD(PValue pvalue, JavaRDDLike<T, ?> rdd) {
+  protected <T> void setRDD(PValue pvalue, JavaRDDLike<WindowedValue<T>, ?> rdd) {
     try {
       rdd.rdd().setName(pvalue.getName());
     } catch (IllegalStateException e) {
@@ -185,7 +215,7 @@ public class EvaluationContext implements EvaluationResult {
   }
 
 
-  <T> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
+  <T> Iterable<? extends WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
     return pview.get(view);
   }
 
@@ -194,7 +224,7 @@ public class EvaluationContext implements EvaluationResult {
    * actions (like saving to a file) registered on them (i.e. they are performed for side
    * effects).
    */
-  void computeOutputs() {
+  protected void computeOutputs() {
     for (RDDHolder<?> rddHolder : leafRdds) {
       JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
       rdd.rdd().cache(); // cache so that any subsequent get() is cheap
@@ -237,6 +267,12 @@ public class EvaluationContext implements EvaluationResult {
     return rddHolder.getValues(pcollection);
   }
 
+  <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
+    @SuppressWarnings("unchecked")
+    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
+    return rddHolder.getWindowedValues(pcollection);
+  }
+
   @Override
   public void close() {
     SparkContextFactory.stopSparkContext(jsc);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
index 8a9f8d5..17daff0 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
@@ -19,6 +19,7 @@ import java.util.Iterator;
 import java.util.Map;
 
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
@@ -35,7 +36,8 @@ import scala.Tuple2;
  * @param <I> Input type for DoFunction.
  * @param <O> Output type for DoFunction.
  */
-class MultiDoFnFunction<I, O> implements PairFlatMapFunction<Iterator<I>, TupleTag<?>, Object> {
+class MultiDoFnFunction<I, O>
+    implements PairFlatMapFunction<Iterator<WindowedValue<I>>, TupleTag<?>, WindowedValue<?>> {
   private final DoFn<I, O> mFunction;
   private final SparkRuntimeContext mRuntimeContext;
   private final TupleTag<O> mMainOutputTag;
@@ -53,16 +55,17 @@ class MultiDoFnFunction<I, O> implements PairFlatMapFunction<Iterator<I>, TupleT
   }
 
   @Override
-  public Iterable<Tuple2<TupleTag<?>, Object>> call(Iterator<I> iter) throws Exception {
+  public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>>
+      call(Iterator<WindowedValue<I>> iter) throws Exception {
     ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
     mFunction.startBundle(ctxt);
     ctxt.setup();
     return ctxt.getOutputIterable(iter, mFunction);
   }
 
-  private class ProcCtxt extends SparkProcessContext<I, O, Tuple2<TupleTag<?>, Object>> {
+  private class ProcCtxt extends SparkProcessContext<I, O, Tuple2<TupleTag<?>, WindowedValue<?>>> {
 
-    private final Multimap<TupleTag<?>, Object> outputs = LinkedListMultimap.create();
+    private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();
 
     ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
         BroadcastHelper<?>> sideInputs) {
@@ -71,17 +74,23 @@ class MultiDoFnFunction<I, O> implements PairFlatMapFunction<Iterator<I>, TupleT
 
     @Override
     public synchronized void output(O o) {
+      outputs.put(mMainOutputTag, windowedValue.withValue(o));
+    }
+
+    @Override
+    public synchronized void output(WindowedValue<O> o) {
       outputs.put(mMainOutputTag, o);
     }
 
     @Override
     public synchronized <T> void sideOutput(TupleTag<T> tag, T t) {
-      outputs.put(tag, t);
+      outputs.put(tag, windowedValue.withValue(t));
     }
 
     @Override
     public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
-      outputs.put(tupleTag, t);
+      outputs.put(tupleTag, WindowedValue.of(t, instant,
+          windowedValue.getWindows(), windowedValue.getPane()));
     }
 
     @Override
@@ -89,12 +98,14 @@ class MultiDoFnFunction<I, O> implements PairFlatMapFunction<Iterator<I>, TupleT
       outputs.clear();
     }
 
-    protected Iterator<Tuple2<TupleTag<?>, Object>> getOutputIterator() {
+    protected Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> getOutputIterator() {
       return Iterators.transform(outputs.entries().iterator(),
-          new Function<Map.Entry<TupleTag<?>, Object>, Tuple2<TupleTag<?>, Object>>() {
+          new Function<Map.Entry<TupleTag<?>, WindowedValue<?>>,
+              Tuple2<TupleTag<?>, WindowedValue<?>>>() {
         @Override
-        public Tuple2<TupleTag<?>, Object> apply(Map.Entry<TupleTag<?>, Object> input) {
-          return new Tuple2<TupleTag<?>, Object>(input.getKey(), input.getValue());
+        public Tuple2<TupleTag<?>, WindowedValue<?>> apply(Map.Entry<TupleTag<?>,
+            WindowedValue<?>> input) {
+          return new Tuple2<TupleTag<?>, WindowedValue<?>>(input.getKey(), input.getValue());
         }
       });
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
index b7570b3..97cbc20 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
@@ -34,10 +34,10 @@ final class SparkContextFactory {
   private SparkContextFactory() {
   }
 
-  public static synchronized JavaSparkContext getSparkContext(String master) {
+  public static synchronized JavaSparkContext getSparkContext(String master, String appName) {
     if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
       if (sparkContext == null) {
-        sparkContext = createSparkContext(master);
+        sparkContext = createSparkContext(master, appName);
         sparkMaster = master;
       } else if (!master.equals(sparkMaster)) {
         throw new IllegalArgumentException(String.format("Cannot reuse spark context " +
@@ -46,7 +46,7 @@ final class SparkContextFactory {
       }
       return sparkContext;
     } else {
-      return createSparkContext(master);
+      return createSparkContext(master, appName);
     }
   }
 
@@ -56,10 +56,10 @@ final class SparkContextFactory {
     }
   }
 
-  private static JavaSparkContext createSparkContext(String master) {
+  private static JavaSparkContext createSparkContext(String master, String appName) {
     SparkConf conf = new SparkConf();
     conf.setMaster(master);
-    conf.setAppName("spark dataflow pipeline job");
+    conf.setAppName(appName);
     conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
     return new JavaSparkContext(conf);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java
new file mode 100644
index 0000000..6762180
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.spark;
+
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+
+/**
+ * Pipeline {@link SparkPipelineRunner.Evaluator} for Spark.
+ */
+public final class SparkPipelineEvaluator extends SparkPipelineRunner.Evaluator {
+
+  private final EvaluationContext ctxt;
+
+  public SparkPipelineEvaluator(EvaluationContext ctxt, SparkPipelineTranslator translator) {
+    super(translator);
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  protected <PT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformTreeNode
+      node) {
+    @SuppressWarnings("unchecked")
+    PT transform = (PT) node.getTransform();
+    @SuppressWarnings("unchecked")
+    Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
+    @SuppressWarnings("unchecked") TransformEvaluator<PT> evaluator =
+        (TransformEvaluator<PT>) translator.translate(transformClass);
+    LOG.info("Evaluating {}", transform);
+    AppliedPTransform<PInput, POutput, PT> appliedTransform =
+        AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform);
+    ctxt.setCurrentTransform(appliedTransform);
+    evaluator.evaluate(transform, ctxt);
+    ctxt.setCurrentTransform(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java
index 0679306..e96162e 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java
@@ -15,14 +15,25 @@
 
 package com.cloudera.dataflow.spark;
 
+import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
 import com.google.cloud.dataflow.sdk.options.Default;
 import com.google.cloud.dataflow.sdk.options.Description;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.StreamingOptions;
 
-public interface SparkPipelineOptions extends PipelineOptions {
+public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
+                                              ApplicationNameOptions {
   @Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).")
   @Default.String("local[1]")
   String getSparkMaster();
 
   void setSparkMaster(String master);
+
+  @Override
+  @Default.Boolean(false)
+  boolean isStreaming();
+
+  @Override
+  @Default.String("spark dataflow pipeline job")
+  String getAppName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
index 5bed6e5..e980ae3 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
@@ -20,16 +20,23 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.values.PInput;
 import com.google.cloud.dataflow.sdk.values.POutput;
 import com.google.cloud.dataflow.sdk.values.PValue;
+
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptions;
+import com.cloudera.dataflow.spark.streaming.StreamingEvaluationContext;
+import com.cloudera.dataflow.spark.streaming.StreamingTransformTranslator;
+import com.cloudera.dataflow.spark.streaming.StreamingWindowPipelineDetector;
+
 /**
  * The SparkPipelineRunner translate operations defined on a pipeline to a representation
  * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
@@ -50,6 +57,8 @@ import org.slf4j.LoggerFactory;
  * options.setSparkMaster("spark://host:port");
  * EvaluationResult result = SparkPipelineRunner.create(options).run(p);
  * }
+ *
+ * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions}
  */
 public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult> {
 
@@ -104,16 +113,49 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
   @Override
   public EvaluationResult run(Pipeline pipeline) {
     try {
+      // validate streaming configuration
+      if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) {
+        throw new RuntimeException("A streaming job must be configured with " +
+            SparkStreamingPipelineOptions.class.getSimpleName() + ", found " +
+            mOptions.getClass().getSimpleName());
+      }
       LOG.info("Executing pipeline using the SparkPipelineRunner.");
+      final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
+              .getSparkMaster(), mOptions.getAppName());
+
+      if (mOptions.isStreaming()) {
+        SparkPipelineTranslator translator =
+                new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
+        // if streaming - fixed window should be defined on all UNBOUNDED inputs
+        StreamingWindowPipelineDetector streamingWindowPipelineDetector =
+            new StreamingWindowPipelineDetector(translator);
+        pipeline.traverseTopologically(streamingWindowPipelineDetector);
+        if (!streamingWindowPipelineDetector.isWindowing()) {
+          throw new IllegalStateException("Spark streaming pipeline must be windowed!");
+        }
+
+        Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration();
+        LOG.info("Setting Spark streaming batchInterval to " +
+            batchInterval.milliseconds() + "msec");
+        EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
+
+        pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
+        ctxt.computeOutputs();
+
+        LOG.info("Streaming pipeline construction complete. Starting execution..");
+        ((StreamingEvaluationContext) ctxt).getStreamingContext().start();
 
-      JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster());
-      EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
-      pipeline.traverseTopologically(new Evaluator(ctxt));
-      ctxt.computeOutputs();
+        return ctxt;
+      } else {
+        EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
+        SparkPipelineTranslator translator = new TransformTranslator.Translator();
+        pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
+        ctxt.computeOutputs();
 
-      LOG.info("Pipeline execution complete.");
+        LOG.info("Pipeline execution complete.");
 
-      return ctxt;
+        return ctxt;
+      }
     } catch (Exception e) {
       // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
       // won't let you catch something that is not declared, so we can't catch
@@ -122,7 +164,7 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
       // SparkProcessException), or just use the SparkException cause.
       if (e instanceof SparkException && e.getCause() != null) {
         if (e.getCause() instanceof SparkProcessContext.SparkProcessException &&
-            e.getCause().getCause() != null) {
+                e.getCause().getCause() != null) {
           throw new RuntimeException(e.getCause().getCause());
         } else {
           throw new RuntimeException(e.getCause());
@@ -133,21 +175,31 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
     }
   }
 
-  private static final class Evaluator implements Pipeline.PipelineVisitor {
+  private EvaluationContext
+      createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
+      Duration batchDuration) {
+    SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions;
+    final JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
+    return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
+  }
+
+  public abstract static class Evaluator implements Pipeline.PipelineVisitor {
+    protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
+
+    protected final SparkPipelineTranslator translator;
 
-    private final EvaluationContext ctxt;
+    protected Evaluator(SparkPipelineTranslator translator) {
+      this.translator = translator;
+    }
 
     // Set upon entering a composite node which can be directly mapped to a single
     // TransformEvaluator.
     private TransformTreeNode currentTranslatedCompositeNode;
 
-    private Evaluator(EvaluationContext ctxt) {
-      this.ctxt = ctxt;
-    }
-
     /**
      * If true, we're currently inside a subtree of a composite node which directly maps to a
-     * single TransformEvaluator; children nodes are ignored, and upon post-visiting the translated
+     * single
+     * TransformEvaluator; children nodes are ignored, and upon post-visiting the translated
      * composite node, the associated TransformEvaluator will be visited.
      */
     private boolean inTranslatedCompositeNode() {
@@ -160,10 +212,12 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
         return;
       }
 
+      //noinspection unchecked
       if (node.getTransform() != null
-          && TransformTranslator.hasTransformEvaluator(node.getTransform().getClass())) {
+              && translator.hasTranslation(
+              (Class<? extends PTransform<?, ?>>) node.getTransform().getClass())) {
         LOG.info("Entering directly-translatable composite transform: '{}'",
-            node.getFullName());
+                node.getFullName());
         LOG.debug("Composite transform class: '{}'", node.getTransform().getClass());
         currentTranslatedCompositeNode = node;
       }
@@ -176,7 +230,7 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
       // within the tree.
       if (inTranslatedCompositeNode() && node.equals(currentTranslatedCompositeNode)) {
         LOG.info("Post-visiting directly-translatable composite transform: '{}'",
-            node.getFullName());
+                node.getFullName());
         doVisitTransform(node);
         currentTranslatedCompositeNode = null;
       }
@@ -191,23 +245,11 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
       doVisitTransform(node);
     }
 
-    private <PT extends PTransform<? super PInput, POutput>>
-        void doVisitTransform(TransformTreeNode node) {
-      @SuppressWarnings("unchecked")
-      PT transform = (PT) node.getTransform();
-      @SuppressWarnings("unchecked")
-      Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
-      TransformEvaluator<PT> evaluator = TransformTranslator.getTransformEvaluator(transformClass);
-      LOG.info("Evaluating {}", transform);
-      AppliedPTransform<PInput, POutput, PT> appliedTransform =
-          AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform);
-      ctxt.setCurrentTransform(appliedTransform);
-      evaluator.evaluate(transform, ctxt);
-      ctxt.setCurrentTransform(null);
-    }
+    protected abstract <PT extends PTransform<? super PInput, POutput>> void
+        doVisitTransform(TransformTreeNode node);
 
     @Override
-    public void visitValue(PValue pvalue, TransformTreeNode node) {
+    public void visitValue(PValue value, TransformTreeNode producer) {
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
new file mode 100644
index 0000000..ff49317
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+
+/**
+ * Translator to support translation between Dataflow transformations and Spark transformations.
+ */
+public interface SparkPipelineTranslator {
+
+  boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz);
+
+  TransformEvaluator<? extends PTransform<?, ?>> translate(Class<? extends PTransform<?, ?>> clazz);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index e170926..f68efb4 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -17,7 +17,6 @@ package com.cloudera.dataflow.spark;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -27,7 +26,6 @@ import com.google.cloud.dataflow.sdk.transforms.Aggregator;
 import com.google.cloud.dataflow.sdk.transforms.Combine;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
 import com.google.cloud.dataflow.sdk.util.TimerInternals;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
@@ -36,6 +34,8 @@ import com.google.cloud.dataflow.sdk.util.state.StateInternals;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,18 +44,17 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
 
-  private static final Collection<? extends BoundedWindow> GLOBAL_WINDOWS =
-      Collections.singletonList(GlobalWindow.INSTANCE);
-
+  private final DoFn<I, O> fn;
   private final SparkRuntimeContext mRuntimeContext;
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
 
-  protected I element;
+  protected WindowedValue<I> windowedValue;
 
   SparkProcessContext(DoFn<I, O> fn,
       SparkRuntimeContext runtime,
       Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
     fn.super();
+    this.fn = fn;
     this.mRuntimeContext = runtime;
     this.mSideInputs = sideInputs;
   }
@@ -81,6 +80,8 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
   @Override
   public abstract void output(O output);
 
+  public abstract void output(WindowedValue<O> output);
+
   @Override
   public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
     String message = "sideOutput is an unsupported operation for doFunctions, use a " +
@@ -107,27 +108,32 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
 
   @Override
   public I element() {
-    return element;
+    return windowedValue.getValue();
   }
 
   @Override
   public void outputWithTimestamp(O output, Instant timestamp) {
-    output(output);
+    output(WindowedValue.of(output, timestamp,
+        windowedValue.getWindows(), windowedValue.getPane()));
   }
 
   @Override
   public Instant timestamp() {
-    return Instant.now();
+    return windowedValue.getTimestamp();
   }
 
   @Override
   public BoundedWindow window() {
-    return GlobalWindow.INSTANCE;
+    if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+      throw new UnsupportedOperationException(
+          "window() is only available in the context of a DoFn marked as RequiresWindow.");
+    }
+    return Iterables.getOnlyElement(windowedValue.getWindows());
   }
 
   @Override
   public PaneInfo pane() {
-    return PaneInfo.NO_FIRING;
+    return windowedValue.getPane();
   }
 
   @Override
@@ -136,13 +142,13 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
 
       @Override
       public Collection<? extends BoundedWindow> windows() {
-        return GLOBAL_WINDOWS;
+        return windowedValue.getWindows();
       }
 
       @Override
       public void outputWindowedValue(O output, Instant timestamp, Collection<?
           extends BoundedWindow> windows, PaneInfo paneInfo) {
-        output(output);
+        output(WindowedValue.of(output, timestamp, windows, paneInfo));
       }
 
       @Override
@@ -159,7 +165,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
 
       @Override
       public PaneInfo pane() {
-        return PaneInfo.NO_FIRING;
+        return windowedValue.getPane();
       }
 
       @Override
@@ -174,7 +180,8 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
   protected abstract void clearOutput();
   protected abstract Iterator<V> getOutputIterator();
 
-  protected Iterable<V> getOutputIterable(final Iterator<I> iter, final DoFn<I, O> doFn) {
+  protected Iterable<V> getOutputIterable(final Iterator<WindowedValue<I>> iter,
+      final DoFn<I, O> doFn) {
     return new Iterable<V>() {
       @Override
       public Iterator<V> iterator() {
@@ -185,12 +192,12 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
 
   private class ProcCtxtIterator extends AbstractIterator<V> {
 
-    private final Iterator<I> inputIterator;
+    private final Iterator<WindowedValue<I>> inputIterator;
     private final DoFn<I, O> doFn;
     private Iterator<V> outputIterator;
     private boolean calledFinish = false;
 
-    ProcCtxtIterator(Iterator<I> iterator, DoFn<I, O> doFn) {
+    ProcCtxtIterator(Iterator<WindowedValue<I>> iterator, DoFn<I, O> doFn) {
       this.inputIterator = iterator;
       this.doFn = doFn;
       this.outputIterator = getOutputIterator();
@@ -208,7 +215,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
           return outputIterator.next();
         } else if (inputIterator.hasNext()) {
           clearOutput();
-          element = inputIterator.next();
+          windowedValue = inputIterator.next();
           try {
             doFn.processElement(SparkProcessContext.this);
           } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
index 4537aa4..560d62f 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
@@ -30,12 +30,17 @@ import com.google.cloud.dataflow.sdk.io.AvroIO;
 import com.google.cloud.dataflow.sdk.io.TextIO;
 import com.google.cloud.dataflow.sdk.transforms.Combine;
 import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.Flatten;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -44,7 +49,7 @@ import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
 import com.google.cloud.dataflow.sdk.values.TupleTag;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
+
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyInputFormat;
@@ -77,10 +82,10 @@ public final class TransformTranslator {
   private TransformTranslator() {
   }
 
-  private static class FieldGetter {
+  public static class FieldGetter {
     private final Map<String, Field> fields;
 
-    FieldGetter(Class<?> clazz) {
+    public FieldGetter(Class<?> clazz) {
       this.fields = Maps.newHashMap();
       for (Field f : clazz.getDeclaredFields()) {
         f.setAccessible(true);
@@ -105,11 +110,11 @@ public final class TransformTranslator {
       @Override
       public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
         PCollectionList<T> pcs = context.getInput(transform);
-        JavaRDD<T>[] rdds = new JavaRDD[pcs.size()];
+        JavaRDD<WindowedValue<T>>[] rdds = new JavaRDD[pcs.size()];
         for (int i = 0; i < rdds.length; i++) {
-          rdds[i] = (JavaRDD<T>) context.getRDD(pcs.get(i));
+          rdds[i] = (JavaRDD<WindowedValue<T>>) context.getRDD(pcs.get(i));
         }
-        JavaRDD<T> rdd = context.getSparkContext().union(rdds);
+        JavaRDD<WindowedValue<T>> rdd = context.getSparkContext().union(rdds);
         context.setOutputRDD(transform, rdd);
       }
     };
@@ -120,8 +125,8 @@ public final class TransformTranslator {
       @Override
       public void evaluate(GroupByKey.GroupByKeyOnly<K, V> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaRDDLike<KV<K, V>, ?> inRDD =
-            (JavaRDDLike<KV<K, V>, ?>) context.getInputRDD(transform);
+        JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD =
+            (JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context.getInputRDD(transform);
         @SuppressWarnings("unchecked")
         KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
         Coder<K> keyCoder = coder.getKeyCoder();
@@ -129,10 +134,13 @@ public final class TransformTranslator {
 
         // Use coders to convert objects in the PCollection to byte arrays, so they
         // can be transferred over the network for the shuffle.
-        JavaRDDLike<KV<K, Iterable<V>>, ?> outRDD = fromPair(toPair(inRDD)
+        JavaRDDLike<WindowedValue<KV<K, Iterable<V>>>, ?> outRDD = fromPair(
+              toPair(inRDD.map(WindowingHelpers.<KV<K, V>>unwindowFunction()))
             .mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder))
             .groupByKey()
-            .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder)));
+            .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder)))
+            // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK
+            .map(WindowingHelpers.<KV<K, Iterable<V>>>windowFunction());
         context.setOutputRDD(transform, outRDD);
       }
     };
@@ -144,11 +152,12 @@ public final class TransformTranslator {
     return new TransformEvaluator<Combine.GroupedValues<K, VI, VO>>() {
       @Override
       public void evaluate(Combine.GroupedValues<K, VI, VO> transform, EvaluationContext context) {
-        Combine.KeyedCombineFn<K, VI, ?, VI> keyed = GROUPED_FG.get("fn", transform);
+        Combine.KeyedCombineFn<K, VI, ?, VO> keyed = GROUPED_FG.get("fn", transform);
         @SuppressWarnings("unchecked")
-        JavaRDDLike<KV<K, Iterable<VI>>, ?> inRDD =
-            (JavaRDDLike<KV<K, Iterable<VI>>, ?>) context.getInputRDD(transform);
-        context.setOutputRDD(transform, inRDD.map(new KVFunction<>(keyed)));
+        JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?> inRDD =
+            (JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?>) context.getInputRDD(transform);
+        context.setOutputRDD(transform,
+            inRDD.map(new KVFunction<>(keyed)));
       }
     };
   }
@@ -163,7 +172,8 @@ public final class TransformTranslator {
         final Combine.CombineFn<I, A, O> globally = COMBINE_GLOBALLY_FG.get("fn", transform);
 
         @SuppressWarnings("unchecked")
-        JavaRDDLike<I, ?> inRdd = (JavaRDDLike<I, ?>) context.getInputRDD(transform);
+        JavaRDDLike<WindowedValue<I>, ?> inRdd =
+            (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
 
         final Coder<I> iCoder = context.getInput(transform).getCoder();
         final Coder<A> aCoder;
@@ -176,7 +186,9 @@ public final class TransformTranslator {
 
         // Use coders to convert objects in the PCollection to byte arrays, so they
         // can be transferred over the network for the shuffle.
-        JavaRDD<byte[]> inRddBytes = inRdd.map(CoderHelpers.toByteFunction(iCoder));
+        JavaRDD<byte[]> inRddBytes = inRdd
+            .map(WindowingHelpers.<I>unwindowFunction())
+            .map(CoderHelpers.toByteFunction(iCoder));
 
         /*A*/ byte[] acc = inRddBytes.aggregate(
             CoderHelpers.toByteArray(globally.createAccumulator(), aCoder),
@@ -206,7 +218,8 @@ public final class TransformTranslator {
         JavaRDD<byte[]> outRdd = context.getSparkContext().parallelize(
             // don't use Guava's ImmutableList.of as output may be null
             CoderHelpers.toByteArrays(Collections.singleton(output), coder));
-        context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(coder)));
+        context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(coder))
+            .map(WindowingHelpers.<O>windowFunction()));
       }
     };
   }
@@ -220,14 +233,14 @@ public final class TransformTranslator {
         final Combine.KeyedCombineFn<K, VI, VA, VO> keyed =
             COMBINE_PERKEY_FG.get("fn", transform);
         @SuppressWarnings("unchecked")
-        JavaRDDLike<KV<K, VI>, ?> inRdd =
-            (JavaRDDLike<KV<K, VI>, ?>) context.getInputRDD(transform);
+        JavaRDDLike<WindowedValue<KV<K, VI>>, ?> inRdd =
+            (JavaRDDLike<WindowedValue<KV<K, VI>>, ?>) context.getInputRDD(transform);
 
         @SuppressWarnings("unchecked")
         KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder();
         Coder<K> keyCoder = inputCoder.getKeyCoder();
         Coder<VI> viCoder = inputCoder.getValueCoder();
-        Coder<VA> vaCoder = null;
+        Coder<VA> vaCoder;
         try {
           vaCoder = keyed.getAccumulatorCoder(
               context.getPipeline().getCoderRegistry(), keyCoder, viCoder);
@@ -243,18 +256,35 @@ public final class TransformTranslator {
         // require the key in addition to the VI's and VA's being merged/accumulated. Once Spark
         // provides a way to include keys in the arguments of combine/merge functions, we won't
         // need to duplicate the keys anymore.
-        JavaPairRDD<K, KV<K, VI>> inRddDuplicatedKeyPair = inRdd.mapToPair(
-            new PairFunction<KV<K, VI>, K, KV<K, VI>>() {
-              @Override
-              public Tuple2<K, KV<K, VI>> call(KV<K, VI> kv) {
-                return new Tuple2<>(kv.getKey(), kv);
-              }
-            });
+
+        // Key has to bw windowed in order to group by window as well
+        JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, VI>>> inRddDuplicatedKeyPair =
+            inRdd.mapToPair(
+                new PairFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>,
+                    WindowedValue<KV<K, VI>>>() {
+                  @Override
+                  public Tuple2<WindowedValue<K>,
+                      WindowedValue<KV<K, VI>>> call(WindowedValue<KV<K, VI>> kv) {
+                    WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
+                        kv.getTimestamp(), kv.getWindows(), kv.getPane());
+                    return new Tuple2<>(wk, kv);
+                  }
+                });
+        //-- windowed coders
+        final WindowedValue.FullWindowedValueCoder<K> wkCoder =
+                WindowedValue.FullWindowedValueCoder.of(keyCoder,
+                context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
+        final WindowedValue.FullWindowedValueCoder<KV<K, VI>> wkviCoder =
+                WindowedValue.FullWindowedValueCoder.of(kviCoder,
+                context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
+        final WindowedValue.FullWindowedValueCoder<KV<K, VA>> wkvaCoder =
+                WindowedValue.FullWindowedValueCoder.of(kvaCoder,
+                context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
 
         // Use coders to convert objects in the PCollection to byte arrays, so they
         // can be transferred over the network for the shuffle.
         JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair
-            .mapToPair(CoderHelpers.toByteFunction(keyCoder, kviCoder));
+            .mapToPair(CoderHelpers.toByteFunction(wkCoder, wkviCoder));
 
         // The output of combineByKey will be "VA" (accumulator) types rather than "VO" (final
         // output types) since Combine.CombineFn only provides ways to merge VAs, and no way
@@ -264,60 +294,85 @@ public final class TransformTranslator {
             new Function</*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() {
               @Override
               public /*KV<K, VA>*/ byte[] call(/*KV<K, VI>*/ byte[] input) {
-                KV<K, VI> kvi = CoderHelpers.fromByteArray(input, kviCoder);
-                VA va = keyed.createAccumulator(kvi.getKey());
-                va = keyed.addInput(kvi.getKey(), va, kvi.getValue());
-                return CoderHelpers.toByteArray(KV.of(kvi.getKey(), va), kvaCoder);
+                WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
+                VA va = keyed.createAccumulator(wkvi.getValue().getKey());
+                va = keyed.addInput(wkvi.getValue().getKey(), va, wkvi.getValue().getValue());
+                WindowedValue<KV<K, VA>> wkva =
+                    WindowedValue.of(KV.of(wkvi.getValue().getKey(), va), wkvi.getTimestamp(),
+                    wkvi.getWindows(), wkvi.getPane());
+                return CoderHelpers.toByteArray(wkva, wkvaCoder);
               }
             },
             new Function2</*KV<K, VA>*/ byte[], /*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() {
               @Override
               public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc,
                   /*KV<K, VI>*/ byte[] input) {
-                KV<K, VA> kva = CoderHelpers.fromByteArray(acc, kvaCoder);
-                KV<K, VI> kvi = CoderHelpers.fromByteArray(input, kviCoder);
-                VA va = keyed.addInput(kva.getKey(), kva.getValue(), kvi.getValue());
-                kva = KV.of(kva.getKey(), va);
-                return CoderHelpers.toByteArray(KV.of(kva.getKey(), kva.getValue()), kvaCoder);
+                WindowedValue<KV<K, VA>> wkva = CoderHelpers.fromByteArray(acc, wkvaCoder);
+                WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
+                VA va = keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(),
+                    wkvi.getValue().getValue());
+                wkva = WindowedValue.of(KV.of(wkva.getValue().getKey(), va), wkva.getTimestamp(),
+                    wkva.getWindows(), wkva.getPane());
+                return CoderHelpers.toByteArray(wkva, wkvaCoder);
               }
             },
             new Function2</*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[]>() {
               @Override
               public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc1,
                   /*KV<K, VA>*/ byte[] acc2) {
-                KV<K, VA> kva1 = CoderHelpers.fromByteArray(acc1, kvaCoder);
-                KV<K, VA> kva2 = CoderHelpers.fromByteArray(acc2, kvaCoder);
-                VA va = keyed.mergeAccumulators(kva1.getKey(),
+                WindowedValue<KV<K, VA>> wkva1 = CoderHelpers.fromByteArray(acc1, wkvaCoder);
+                WindowedValue<KV<K, VA>> wkva2 = CoderHelpers.fromByteArray(acc2, wkvaCoder);
+                VA va = keyed.mergeAccumulators(wkva1.getValue().getKey(),
                     // don't use Guava's ImmutableList.of as values may be null
-                    Collections.unmodifiableList(Arrays.asList(kva1.getValue(), kva2.getValue())));
-                return CoderHelpers.toByteArray(KV.of(kva1.getKey(), va), kvaCoder);
+                    Collections.unmodifiableList(Arrays.asList(wkva1.getValue().getValue(),
+                    wkva2.getValue().getValue())));
+                WindowedValue<KV<K, VA>> wkva = WindowedValue.of(KV.of(wkva1.getValue().getKey(),
+                    va), wkva1.getTimestamp(), wkva1.getWindows(), wkva1.getPane());
+                return CoderHelpers.toByteArray(wkva, wkvaCoder);
               }
             });
 
-        JavaPairRDD<K, VO> extracted = accumulatedBytes
-            .mapToPair(CoderHelpers.fromByteFunction(keyCoder, kvaCoder))
+        JavaPairRDD<WindowedValue<K>, WindowedValue<VO>> extracted = accumulatedBytes
+            .mapToPair(CoderHelpers.fromByteFunction(wkCoder, wkvaCoder))
             .mapValues(
-                new Function<KV<K, VA>, VO>() {
+                new Function<WindowedValue<KV<K, VA>>, WindowedValue<VO>>() {
                   @Override
-                  public VO call(KV<K, VA> acc) {
-                    return keyed.extractOutput(acc.getKey(), acc.getValue());
+                  public WindowedValue<VO> call(WindowedValue<KV<K, VA>> acc) {
+                    return WindowedValue.of(keyed.extractOutput(acc.getValue().getKey(),
+                        acc.getValue().getValue()), acc.getTimestamp(),
+                        acc.getWindows(), acc.getPane());
                   }
                 });
-        context.setOutputRDD(transform, fromPair(extracted));
+
+        context.setOutputRDD(transform,
+            fromPair(extracted)
+            .map(new Function<KV<WindowedValue<K>, WindowedValue<VO>>, WindowedValue<KV<K, VO>>>() {
+              @Override
+              public WindowedValue<KV<K, VO>> call(KV<WindowedValue<K>, WindowedValue<VO>> kwvo)
+                  throws Exception {
+                WindowedValue<VO> wvo = kwvo.getValue();
+                KV<K, VO> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue());
+                return WindowedValue.of(kvo, wvo.getTimestamp(), wvo.getWindows(), wvo.getPane());
+              }
+            }));
       }
     };
   }
 
-  private static final class KVFunction<K, V> implements Function<KV<K, Iterable<V>>, KV<K, V>> {
-    private final Combine.KeyedCombineFn<K, V, ?, V> keyed;
+  private static final class KVFunction<K, VI, VO>
+      implements Function<WindowedValue<KV<K, Iterable<VI>>>, WindowedValue<KV<K, VO>>> {
+    private final Combine.KeyedCombineFn<K, VI, ?, VO> keyed;
 
-    KVFunction(Combine.KeyedCombineFn<K, V, ?, V> keyed) {
+     KVFunction(Combine.KeyedCombineFn<K, VI, ?, VO> keyed) {
       this.keyed = keyed;
     }
 
     @Override
-    public KV<K, V> call(KV<K, Iterable<V>> kv) throws Exception {
-      return KV.of(kv.getKey(), keyed.apply(kv.getKey(), kv.getValue()));
+    public WindowedValue<KV<K, VO>> call(WindowedValue<KV<K, Iterable<VI>>> windowedKv)
+        throws Exception {
+      KV<K, Iterable<VI>> kv = windowedKv.getValue();
+      return WindowedValue.of(KV.of(kv.getKey(), keyed.apply(kv.getKey(), kv.getValue())),
+          windowedKv.getTimestamp(), windowedKv.getWindows(), windowedKv.getPane());
     }
   }
 
@@ -348,7 +403,8 @@ public final class TransformTranslator {
                 context.getRuntimeContext(),
                 getSideInputs(transform.getSideInputs(), context));
         @SuppressWarnings("unchecked")
-        JavaRDDLike<I, ?> inRDD = (JavaRDDLike<I, ?>) context.getInputRDD(transform);
+        JavaRDDLike<WindowedValue<I>, ?> inRDD =
+            (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
         context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
       }
     };
@@ -368,17 +424,21 @@ public final class TransformTranslator {
             getSideInputs(transform.getSideInputs(), context));
 
         @SuppressWarnings("unchecked")
-        JavaRDDLike<I, ?> inRDD = (JavaRDDLike<I, ?>) context.getInputRDD(transform);
-        JavaPairRDD<TupleTag<?>, Object> all = inRDD
+        JavaRDDLike<WindowedValue<I>, ?> inRDD =
+            (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
+        JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD
             .mapPartitionsToPair(multifn)
             .cache();
 
         PCollectionTuple pct = context.getOutput(transform);
         for (Map.Entry<TupleTag<?>, PCollection<?>> e : pct.getAll().entrySet()) {
           @SuppressWarnings("unchecked")
-          JavaPairRDD<TupleTag<?>, Object> filtered =
+          JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered =
               all.filter(new TupleTagFilter(e.getKey()));
-          context.setRDD(e.getValue(), filtered.values());
+          @SuppressWarnings("unchecked")
+          // Object is the best we can do since different outputs can have different tags
+          JavaRDD<WindowedValue<Object>> values = (JavaRDD) filtered.values();
+          context.setRDD(e.getValue(), values);
         }
       }
     };
@@ -390,7 +450,8 @@ public final class TransformTranslator {
       @Override
       public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext context) {
         String pattern = transform.getFilepattern();
-        JavaRDD<String> rdd = context.getSparkContext().textFile(pattern);
+        JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern)
+                .map(WindowingHelpers.<String>windowFunction());
         context.setOutputRDD(transform, rdd);
       }
     };
@@ -401,9 +462,11 @@ public final class TransformTranslator {
       @Override
       public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaPairRDD<T, Void> last = ((JavaRDDLike<T, ?>) context.getInputRDD(transform))
+        JavaPairRDD<T, Void> last =
+            ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
+            .map(WindowingHelpers.<T>unwindowFunction())
             .mapToPair(new PairFunction<T, T,
-                Void>() {
+                    Void>() {
               @Override
               public Tuple2<T, Void> call(T t) throws Exception {
                 return new Tuple2<>(t, null);
@@ -431,13 +494,13 @@ public final class TransformTranslator {
                                  AvroKeyInputFormat.class,
                                  AvroKey.class, NullWritable.class,
                                  new Configuration()).keys();
-        JavaRDD<T> rdd = avroFile.map(
+        JavaRDD<WindowedValue<T>> rdd = avroFile.map(
             new Function<AvroKey<T>, T>() {
               @Override
               public T call(AvroKey<T> key) {
                 return key.datum();
               }
-            });
+            }).map(WindowingHelpers.<T>windowFunction());
         context.setOutputRDD(transform, rdd);
       }
     };
@@ -456,7 +519,8 @@ public final class TransformTranslator {
         AvroJob.setOutputKeySchema(job, transform.getSchema());
         @SuppressWarnings("unchecked")
         JavaPairRDD<AvroKey<T>, NullWritable> last =
-            ((JavaRDDLike<T, ?>) context.getInputRDD(transform))
+            ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
+            .map(WindowingHelpers.<T>unwindowFunction())
             .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() {
               @Override
               public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception {
@@ -484,12 +548,13 @@ public final class TransformTranslator {
             transform.getFormatClass(),
             transform.getKeyClass(), transform.getValueClass(),
             new Configuration());
-        JavaRDD<KV<K, V>> rdd = file.map(new Function<Tuple2<K, V>, KV<K, V>>() {
+        JavaRDD<WindowedValue<KV<K, V>>> rdd =
+            file.map(new Function<Tuple2<K, V>, KV<K, V>>() {
           @Override
           public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
             return KV.of(t2._1(), t2._2());
           }
-        });
+        }).map(WindowingHelpers.<KV<K, V>>windowFunction());
         context.setOutputRDD(transform, rdd);
       }
     };
@@ -500,8 +565,9 @@ public final class TransformTranslator {
       @Override
       public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaPairRDD<K, V> last = ((JavaRDDLike<KV<K, V>, ?>) context
+        JavaPairRDD<K, V> last = ((JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context
             .getInputRDD(transform))
+            .map(WindowingHelpers.<KV<K, V>>unwindowFunction())
             .mapToPair(new PairFunction<KV<K, V>, K, V>() {
               @Override
               public Tuple2<K, V> call(KV<K, V> t) throws Exception {
@@ -576,12 +642,25 @@ public final class TransformTranslator {
     rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf);
   }
 
-  private static <T> TransformEvaluator<Window.Bound<T>> window() {
+  static final FieldGetter WINDOW_FG = new FieldGetter(Window.Bound.class);
+
+  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
     return new TransformEvaluator<Window.Bound<T>>() {
       @Override
       public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
-        // TODO: detect and support non-global windows
-        context.setOutputRDD(transform, context.getInputRDD(transform));
+        @SuppressWarnings("unchecked")
+        JavaRDDLike<WindowedValue<T>, ?> inRDD =
+            (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform);
+        WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
+        if (windowFn instanceof GlobalWindows) {
+          context.setOutputRDD(transform, inRDD);
+        } else {
+          @SuppressWarnings("unchecked")
+          DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
+          DoFnFunction<T, T> dofn =
+                  new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);
+          context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
+        }
       }
     };
   }
@@ -603,9 +682,9 @@ public final class TransformTranslator {
     return new TransformEvaluator<View.AsSingleton<T>>() {
       @Override
       public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
-        Iterable<T> input = context.get(context.getInput(transform));
-        context.setPView(context.getOutput(transform), Iterables.transform(input,
-            new WindowingFunction<T>()));
+        Iterable<? extends WindowedValue<?>> iter =
+                context.getWindowedValues(context.getInput(transform));
+        context.setPView(context.getOutput(transform), iter);
       }
     };
   }
@@ -614,9 +693,9 @@ public final class TransformTranslator {
     return new TransformEvaluator<View.AsIterable<T>>() {
       @Override
       public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
-        Iterable<T> input = context.get(context.getInput(transform));
-        context.setPView(context.getOutput(transform), Iterables.transform(input,
-            new WindowingFunction<T>()));
+        Iterable<? extends WindowedValue<?>> iter =
+                context.getWindowedValues(context.getInput(transform));
+        context.setPView(context.getOutput(transform), iter);
       }
     };
   }
@@ -625,23 +704,15 @@ public final class TransformTranslator {
     return new TransformEvaluator<View.CreatePCollectionView<R, W>>() {
       @Override
       public void evaluate(View.CreatePCollectionView<R, W> transform, EvaluationContext context) {
-        Iterable<WindowedValue<?>> iter = Iterables.transform(
-            context.get(context.getInput(transform)), new WindowingFunction<R>());
+        Iterable<? extends WindowedValue<?>> iter =
+            context.getWindowedValues(context.getInput(transform));
         context.setPView(context.getOutput(transform), iter);
       }
     };
   }
 
-  private static class WindowingFunction<R> implements com.google.common.base.Function<R,
-      WindowedValue<?>> {
-    @Override
-    public WindowedValue<R> apply(R t) {
-      return WindowedValue.valueInGlobalWindow(t);
-    }
-  }
-
   private static final class TupleTagFilter<V>
-      implements Function<Tuple2<TupleTag<V>, Object>, Boolean> {
+      implements Function<Tuple2<TupleTag<V>, WindowedValue<?>>, Boolean> {
 
     private final TupleTag<V> tag;
 
@@ -650,7 +721,7 @@ public final class TransformTranslator {
     }
 
     @Override
-    public Boolean call(Tuple2<TupleTag<V>, Object> input) {
+    public Boolean call(Tuple2<TupleTag<V>, WindowedValue<?>> input) {
       return tag.equals(input._1());
     }
   }
@@ -663,9 +734,11 @@ public final class TransformTranslator {
     } else {
       Map<TupleTag<?>, BroadcastHelper<?>> sideInputs = Maps.newHashMap();
       for (PCollectionView<?> view : views) {
-        Iterable<WindowedValue<?>> collectionView = context.getPCollectionView(view);
+        Iterable<? extends WindowedValue<?>> collectionView = context.getPCollectionView(view);
         Coder<Iterable<WindowedValue<?>>> coderInternal = view.getCoderInternal();
-        BroadcastHelper<?> helper = BroadcastHelper.create(collectionView, coderInternal);
+        @SuppressWarnings("unchecked")
+        BroadcastHelper<?> helper =
+            BroadcastHelper.create((Iterable<WindowedValue<?>>) collectionView, coderInternal);
         //broadcast side inputs
         helper.broadcast(context.getSparkContext());
         sideInputs.put(view.getTagInternal(), helper);
@@ -702,8 +775,8 @@ public final class TransformTranslator {
     return EVALUATORS.containsKey(clazz);
   }
 
-  public static <PT extends PTransform<?, ?>> TransformEvaluator<PT> getTransformEvaluator(Class<PT>
-                                                                                           clazz) {
+  public static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
+  getTransformEvaluator(Class<PT> clazz) {
     @SuppressWarnings("unchecked")
     TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
     if (transform == null) {
@@ -711,4 +784,21 @@ public final class TransformTranslator {
     }
     return transform;
   }
+
+  /**
+   * Translator matches Dataflow transformation with the appropriate evaluator.
+   */
+  public static class Translator implements SparkPipelineTranslator {
+
+    @Override
+    public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
+      return hasTransformEvaluator(clazz);
+    }
+
+    @Override
+    public TransformEvaluator<? extends PTransform<?, ?>> translate(
+        Class<? extends PTransform<?, ?>> clazz) {
+      return getTransformEvaluator(clazz);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a2e9a72/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java
new file mode 100644
index 0000000..90600b2
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/WindowingHelpers.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.spark;
+
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * Helper functions for working with windows.
+ */
+public final class WindowingHelpers {
+  private WindowingHelpers() {
+  }
+
+  /**
+   * A function for converting a value to a {@link WindowedValue}. The resulting
+   * {@link WindowedValue} will be in no windows, and will have the default timestamp
+   * and pane.
+   *
+   * @param <T>   The type of the object.
+   * @return A function that accepts an object and returns its {@link WindowedValue}.
+   */
+  public static <T> Function<T, WindowedValue<T>> windowFunction() {
+    return new Function<T, WindowedValue<T>>() {
+      @Override
+      public WindowedValue<T> call(T t) {
+        return WindowedValue.valueInEmptyWindows(t);
+      }
+    };
+  }
+
+  /**
+   * A function for extracting the value from a {@link WindowedValue}.
+   *
+   * @param <T>   The type of the object.
+   * @return A function that accepts a {@link WindowedValue} and returns its value.
+   */
+  public static <T> Function<WindowedValue<T>, T> unwindowFunction() {
+    return new Function<WindowedValue<T>, T>() {
+      @Override
+      public T call(WindowedValue<T> t) {
+        return t.getValue();
+      }
+    };
+  }
+}