You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/11/15 11:55:23 UTC

[1/2] incubator-beam git commit: [BEAM-762] Unify spark-runner EvaluationContext and StreamingEvaluationContext

Repository: incubator-beam
Updated Branches:
  refs/heads/master 9c300cde8 -> 2bc66f903


[BEAM-762] Unify spark-runner EvaluationContext and StreamingEvaluationContext

PR 1291 review changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1bef01fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1bef01fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1bef01fe

Branch: refs/heads/master
Commit: 1bef01fef5ff5ff9a960c85b00c2cc4aa504ce4d
Parents: 9c300cd
Author: Aviem Zur <av...@gmail.com>
Authored: Sun Nov 13 13:57:07 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 15 13:35:49 2016 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  |   4 +-
 .../spark/translation/BoundedDataset.java       | 114 ++++++++
 .../beam/runners/spark/translation/Dataset.java |  34 +++
 .../spark/translation/EvaluationContext.java    | 230 +++++++---------
 .../spark/translation/TransformTranslator.java  |  99 +++----
 .../SparkRunnerStreamingContextFactory.java     |   7 +-
 .../streaming/StreamingEvaluationContext.java   | 272 -------------------
 .../streaming/StreamingTransformTranslator.java | 135 +++++----
 .../translation/streaming/UnboundedDataset.java | 103 +++++++
 9 files changed, 464 insertions(+), 534 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 45c7f55..6bbef39 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -26,7 +26,6 @@ import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
-import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -49,6 +48,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * The SparkRunner translate operations defined on a pipeline to a representation
  * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
@@ -136,7 +136,7 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
         jssc.start();
 
         // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance.
-        return contextFactory.getCtxt() == null ? new StreamingEvaluationContext(jssc.sc(),
+        return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(),
             pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt();
       } else {
         if (mOptions.getTimeout() > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
new file mode 100644
index 0000000..774efb9
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are sometimes
+ * created from a collection of objects (using RDD parallelize) and then only used to create View
+ * objects; in which case they do not need to be converted to bytes since they are not transferred
+ * across the network until they are broadcast.
+ */
+public class BoundedDataset<T> implements Dataset {
+  // only set if creating an RDD from a static collection
+  @Nullable private transient JavaSparkContext jsc;
+
+  private Iterable<WindowedValue<T>> windowedValues;
+  private Coder<T> coder;
+  private JavaRDD<WindowedValue<T>> rdd;
+
+  BoundedDataset(JavaRDD<WindowedValue<T>> rdd) {
+    this.rdd = rdd;
+  }
+
+  BoundedDataset(Iterable<T> values, JavaSparkContext jsc, Coder<T> coder) {
+    this.windowedValues =
+        Iterables.transform(values, WindowingHelpers.<T>windowValueFunction());
+    this.jsc = jsc;
+    this.coder = coder;
+  }
+
+  @SuppressWarnings("ConstantConditions")
+  public JavaRDD<WindowedValue<T>> getRDD() {
+    if (rdd == null) {
+      WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
+          WindowedValue.getValueOnlyCoder(coder);
+      rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+          .map(CoderHelpers.fromByteFunction(windowCoder));
+    }
+    return rdd;
+  }
+
+  Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) {
+    if (windowedValues == null) {
+      WindowFn<?, ?> windowFn =
+          pcollection.getWindowingStrategy().getWindowFn();
+      Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder();
+      final WindowedValue.WindowedValueCoder<T> windowedValueCoder;
+      if (windowFn instanceof GlobalWindows) {
+        windowedValueCoder =
+            WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder());
+      } else {
+        windowedValueCoder =
+            WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder);
+      }
+      JavaRDDLike<byte[], ?> bytesRDD =
+          rdd.map(CoderHelpers.toByteFunction(windowedValueCoder));
+      List<byte[]> clientBytes = bytesRDD.collect();
+      windowedValues = Iterables.transform(clientBytes,
+          new Function<byte[], WindowedValue<T>>() {
+            @Override
+            public WindowedValue<T> apply(byte[] bytes) {
+              return CoderHelpers.fromByteArray(bytes, windowedValueCoder);
+            }
+          });
+    }
+    return windowedValues;
+  }
+
+  @Override
+  public void cache() {
+    rdd.cache();
+  }
+
+  @Override
+  public void action() {
+    rdd.count();
+  }
+
+  @Override
+  public void setName(String name) {
+    rdd.setName(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
new file mode 100644
index 0000000..36b03fe
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import java.io.Serializable;
+
+
+/**
+ * Holder for Spark RDD/DStream.
+ */
+public interface Dataset extends Serializable {
+
+  void cache();
+
+  void action();
+
+  void setName(String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 6ccec85..aaf7573 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -20,17 +20,15 @@ package org.apache.beam.runners.spark.translation;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
-import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
@@ -39,17 +37,15 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.joda.time.Duration;
 
 
@@ -58,91 +54,48 @@ import org.joda.time.Duration;
  */
 public class EvaluationContext implements EvaluationResult {
   private final JavaSparkContext jsc;
-  private final Pipeline pipeline;
+  private JavaStreamingContext jssc;
   private final SparkRuntimeContext runtime;
-  private final Map<PValue, RDDHolder<?>> pcollections = new LinkedHashMap<>();
-  private final Set<RDDHolder<?>> leafRdds = new LinkedHashSet<>();
-  private final Set<PValue> multireads = new LinkedHashSet<>();
+  private final Pipeline pipeline;
+  private long timeout;
+  private final Map<PValue, Dataset> datasets = new LinkedHashMap<>();
+  private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>();
+  private final Set<Dataset> leaves = new LinkedHashSet<>();
+  private final Set<PValue> multiReads = new LinkedHashSet<>();
   private final Map<PValue, Object> pobjects = new LinkedHashMap<>();
   private final Map<PValue, Iterable<? extends WindowedValue<?>>> pview = new LinkedHashMap<>();
-  protected AppliedPTransform<?, ?, ?> currentTransform;
+  private AppliedPTransform<?, ?, ?> currentTransform;
+  private State state;
 
   public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
     this.jsc = jsc;
     this.pipeline = pipeline;
     this.runtime = new SparkRuntimeContext(pipeline, jsc);
+    // A batch pipeline is blocking by nature
+    this.state = State.DONE;
   }
 
-  /**
-   * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are
-   * sometimes created from a collection of objects (using RDD parallelize) and then
-   * only used to create View objects; in which case they do not need to be
-   * converted to bytes since they are not transferred across the network until they are
-   * broadcast.
-   */
-  private class RDDHolder<T> {
-
-    private Iterable<WindowedValue<T>> windowedValues;
-    private Coder<T> coder;
-    private JavaRDDLike<WindowedValue<T>, ?> rdd;
-
-    RDDHolder(Iterable<T> values, Coder<T> coder) {
-      this.windowedValues =
-          Iterables.transform(values, WindowingHelpers.<T>windowValueFunction());
-      this.coder = coder;
-    }
-
-    RDDHolder(JavaRDDLike<WindowedValue<T>, ?> rdd) {
-      this.rdd = rdd;
-    }
-
-    JavaRDDLike<WindowedValue<T>, ?> getRDD() {
-      if (rdd == null) {
-        WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
-            WindowedValue.getValueOnlyCoder(coder);
-        rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
-            .map(CoderHelpers.fromByteFunction(windowCoder));
-      }
-      return rdd;
-    }
-
-    Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) {
-      if (windowedValues == null) {
-        WindowFn<?, ?> windowFn =
-                pcollection.getWindowingStrategy().getWindowFn();
-        Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder();
-        final WindowedValue.WindowedValueCoder<T> windowedValueCoder;
-            if (windowFn instanceof GlobalWindows) {
-              windowedValueCoder =
-                  WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder());
-            } else {
-              windowedValueCoder =
-                  WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder);
-            }
-        JavaRDDLike<byte[], ?> bytesRDD =
-            rdd.map(CoderHelpers.toByteFunction(windowedValueCoder));
-        List<byte[]> clientBytes = bytesRDD.collect();
-        windowedValues = Iterables.transform(clientBytes,
-            new Function<byte[], WindowedValue<T>>() {
-          @Override
-          public WindowedValue<T> apply(byte[] bytes) {
-            return CoderHelpers.fromByteArray(bytes, windowedValueCoder);
-          }
-        });
-      }
-      return windowedValues;
-    }
+  public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
+                           JavaStreamingContext jssc, long timeout) {
+    this(jsc, pipeline);
+    this.jssc = jssc;
+    this.timeout = timeout;
+    this.state = State.RUNNING;
   }
 
-  protected JavaSparkContext getSparkContext() {
+  JavaSparkContext getSparkContext() {
     return jsc;
   }
 
+  public JavaStreamingContext getStreamingContext() {
+    return jssc;
+  }
+
   public Pipeline getPipeline() {
     return pipeline;
   }
 
-  protected SparkRuntimeContext getRuntimeContext() {
+  public SparkRuntimeContext getRuntimeContext() {
     return runtime;
   }
 
@@ -150,11 +103,7 @@ public class EvaluationContext implements EvaluationResult {
     this.currentTransform = transform;
   }
 
-  protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
-    return currentTransform;
-  }
-
-  protected <T extends PInput> T getInput(PTransform<T, ?> transform) {
+  public <T extends PInput> T getInput(PTransform<T, ?> transform) {
     checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
         "can only be called with current transform");
     @SuppressWarnings("unchecked")
@@ -162,7 +111,7 @@ public class EvaluationContext implements EvaluationResult {
     return input;
   }
 
-  protected <T extends POutput> T getOutput(PTransform<?, T> transform) {
+  public <T extends POutput> T getOutput(PTransform<?, T> transform) {
     checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
         "can only be called with current transform");
     @SuppressWarnings("unchecked")
@@ -170,81 +119,74 @@ public class EvaluationContext implements EvaluationResult {
     return output;
   }
 
-  protected  <T> void setOutputRDD(PTransform<?, ?> transform,
-      JavaRDDLike<WindowedValue<T>, ?> rdd) {
-    setRDD((PValue) getOutput(transform), rdd);
+  public void putDataset(PTransform<?, ?> transform, Dataset dataset) {
+    putDataset((PValue) getOutput(transform), dataset);
+  }
+
+  public void putDataset(PValue pvalue, Dataset dataset) {
+    try {
+      dataset.setName(pvalue.getName());
+    } catch (IllegalStateException e) {
+      // name not set, ignore
+    }
+    datasets.put(pvalue, dataset);
+    leaves.add(dataset);
+  }
+
+  <T> void putBoundedDatasetFromValues(PTransform<?, ?> transform, Iterable<T> values,
+                                       Coder<T> coder) {
+    datasets.put((PValue) getOutput(transform), new BoundedDataset<>(values, jsc, coder));
   }
 
-  protected  <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
-      Coder<T> coder) {
-    pcollections.put((PValue) getOutput(transform), new RDDHolder<>(values, coder));
+  public <T> void putUnboundedDatasetFromQueue(
+      PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
+    datasets.put((PValue) getOutput(transform), new UnboundedDataset<>(values, jssc, coder));
   }
 
-  public void setPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
+  void putPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
     pview.put(view, value);
   }
 
-  protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
-    PValue pvalue = (PValue) getOutput(transform);
-    return pcollections.containsKey(pvalue);
+  public Dataset borrowDataset(PTransform<?, ?> transform) {
+    return borrowDataset((PValue) getInput(transform));
   }
 
-  public JavaRDDLike<?, ?> getRDD(PValue pvalue) {
-    RDDHolder<?> rddHolder = pcollections.get(pvalue);
-    JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
-    leafRdds.remove(rddHolder);
-    if (multireads.contains(pvalue)) {
+  public Dataset borrowDataset(PValue pvalue) {
+    Dataset dataset = datasets.get(pvalue);
+    leaves.remove(dataset);
+    if (multiReads.contains(pvalue)) {
       // Ensure the RDD is marked as cached
-      rdd.rdd().cache();
+      dataset.cache();
     } else {
-      multireads.add(pvalue);
+      multiReads.add(pvalue);
     }
-    return rdd;
+    return dataset;
   }
 
-  protected <T> void setRDD(PValue pvalue, JavaRDDLike<WindowedValue<T>, ?> rdd) {
-    try {
-      rdd.rdd().setName(pvalue.getName());
-    } catch (IllegalStateException e) {
-      // name not set, ignore
-    }
-    RDDHolder<T> rddHolder = new RDDHolder<>(rdd);
-    pcollections.put(pvalue, rddHolder);
-    leafRdds.add(rddHolder);
-  }
-
-  protected JavaRDDLike<?, ?> getInputRDD(PTransform<? extends PInput, ?> transform) {
-    return getRDD((PValue) getInput(transform));
-  }
-
-
   <T> Iterable<? extends WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
     return pview.get(view);
   }
 
   /**
-   * Computes the outputs for all RDDs that are leaves in the DAG and do not have any
-   * actions (like saving to a file) registered on them (i.e. they are performed for side
-   * effects).
+   * Computes the outputs for all RDDs that are leaves in the DAG and do not have any actions (like
+   * saving to a file) registered on them (i.e. they are performed for side effects).
    */
   public void computeOutputs() {
-    for (RDDHolder<?> rddHolder : leafRdds) {
-      JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
-      rdd.rdd().cache(); // cache so that any subsequent get() is cheap
-      rdd.count(); // force the RDD to be computed
+    for (Dataset dataset : leaves) {
+      dataset.cache(); // cache so that any subsequent get() is cheap.
+      dataset.action(); // force computation.
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public <T> T get(PValue value) {
     if (pobjects.containsKey(value)) {
-      @SuppressWarnings("unchecked")
       T result = (T) pobjects.get(value);
       return result;
     }
     if (pcollections.containsKey(value)) {
-      JavaRDDLike<?, ?> rdd = pcollections.get(value).getRDD();
-      @SuppressWarnings("unchecked")
+      JavaRDD<?> rdd = ((BoundedDataset) pcollections.get(value)).getRDD();
       T res = (T) Iterables.getOnlyElement(rdd.collect());
       pobjects.put(value, res);
       return res;
@@ -271,27 +213,37 @@ public class EvaluationContext implements EvaluationResult {
   @Override
   public <T> Iterable<T> get(PCollection<T> pcollection) {
     @SuppressWarnings("unchecked")
-    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
-    Iterable<WindowedValue<T>> windowedValues = rddHolder.getValues(pcollection);
+    BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection);
+    Iterable<WindowedValue<T>> windowedValues = boundedDataset.getValues(pcollection);
     return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
   }
 
   <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
     @SuppressWarnings("unchecked")
-    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
-    return rddHolder.getValues(pcollection);
+    BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection);
+    return boundedDataset.getValues(pcollection);
   }
 
   @Override
   public void close(boolean gracefully) {
-    // graceful stop is used for streaming.
+    if (isStreamingPipeline()) {
+      // stop streaming context
+      if (timeout > 0) {
+        jssc.awaitTerminationOrTimeout(timeout);
+      } else {
+        jssc.awaitTermination();
+      }
+      // stop streaming context gracefully, so checkpointing (and other computations) get to
+      // finish before shutdown.
+      jssc.stop(false, gracefully);
+    }
+    state = State.DONE;
     SparkContextFactory.stopSparkContext(jsc);
   }
 
-  /** The runner is blocking. */
   @Override
   public State getState() {
-    return State.DONE;
+    return state;
   }
 
   @Override
@@ -307,9 +259,19 @@ public class EvaluationContext implements EvaluationResult {
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    // This is no-op, since Spark runner in batch is blocking.
-    // It needs to be updated once SparkRunner supports non-blocking execution:
-    // https://issues.apache.org/jira/browse/BEAM-595
-    return State.DONE;
+    if (isStreamingPipeline()) {
+      throw new UnsupportedOperationException(
+          "Spark runner EvaluationContext does not support waitUntilFinish for streaming "
+              + "pipelines.");
+    } else {
+      // This is no-op, since Spark runner in batch is blocking.
+      // It needs to be updated once SparkRunner supports non-blocking execution:
+      // https://issues.apache.org/jira/browse/BEAM-595
+      return State.DONE;
+    }
+  }
+
+  private boolean isStreamingPipeline() {
+    return jssc != null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 2e682c4..c902ee3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.beam.runners.spark.translation;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -73,11 +72,9 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
-
 import scala.Tuple2;
 
 
@@ -101,11 +98,11 @@ public final class TransformTranslator {
         } else {
           JavaRDD<WindowedValue<T>>[] rdds = new JavaRDD[pcs.size()];
           for (int i = 0; i < rdds.length; i++) {
-            rdds[i] = (JavaRDD<WindowedValue<T>>) context.getRDD(pcs.get(i));
+            rdds[i] = ((BoundedDataset<T>) context.borrowDataset(pcs.get(i))).getRDD();
           }
           unionRDD = context.getSparkContext().union(rdds);
         }
-        context.setOutputRDD(transform, unionRDD);
+        context.putDataset(transform, new BoundedDataset<>(unionRDD));
       }
     };
   }
@@ -116,7 +113,7 @@ public final class TransformTranslator {
       public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<KV<K, V>>> inRDD =
-            (JavaRDD<WindowedValue<KV<K, V>>>) context.getInputRDD(transform);
+            ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)).getRDD();
 
         @SuppressWarnings("unchecked")
         final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
@@ -124,8 +121,9 @@ public final class TransformTranslator {
         final Accumulator<NamedAggregators> accum =
                 AccumulatorSingleton.getInstance(context.getSparkContext());
 
-        context.setOutputRDD(transform, GroupCombineFunctions.groupByKey(inRDD, accum, coder,
-            context.getRuntimeContext(), context.getInput(transform).getWindowingStrategy()));
+        context.putDataset(transform,
+            new BoundedDataset<>(GroupCombineFunctions.groupByKey(inRDD, accum, coder,
+                context.getRuntimeContext(), context.getInput(transform).getWindowingStrategy())));
       }
     };
   }
@@ -146,16 +144,17 @@ public final class TransformTranslator {
                 CombineFnUtil.toFnWithContext(transform.getFn());
 
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?>)
-                context.getInputRDD(transform);
+        JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>> inRDD =
+            ((BoundedDataset<KV<K, Iterable<InputT>>>)
+                context.borrowDataset(transform)).getRDD();
 
-        SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext  =
+        SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext =
             new SparkKeyedCombineFn<>(fn, context.getRuntimeContext(),
                 TranslationUtils.getSideInputs(transform.getSideInputs(), context),
-                    windowingStrategy);
-        context.setOutputRDD(transform, inRDD.map(new TranslationUtils.CombineGroupedValues<>(
-            combineFnWithContext)));
+                windowingStrategy);
+        context.putDataset(transform, new BoundedDataset<>(inRDD.map(new TranslationUtils
+            .CombineGroupedValues<>(
+            combineFnWithContext))));
       }
     };
   }
@@ -182,10 +181,11 @@ public final class TransformTranslator {
 
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRdd =
-            (JavaRDD<WindowedValue<InputT>>) context.getInputRDD(transform);
+            ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
 
-        context.setOutputRDD(transform, GroupCombineFunctions.combineGlobally(inRdd, combineFn,
-            iCoder, oCoder, runtimeContext, windowingStrategy, sideInputs, hasDefault));
+        context.putDataset(transform, new BoundedDataset<>(GroupCombineFunctions
+            .combineGlobally(inRdd, combineFn,
+                iCoder, oCoder, runtimeContext, windowingStrategy, sideInputs, hasDefault)));
       }
     };
   }
@@ -212,10 +212,11 @@ public final class TransformTranslator {
 
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<KV<K, InputT>>> inRdd =
-                (JavaRDD<WindowedValue<KV<K, InputT>>>) context.getInputRDD(transform);
+            ((BoundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getRDD();
 
-        context.setOutputRDD(transform, GroupCombineFunctions.combinePerKey(inRdd, combineFn,
-            inputCoder, runtimeContext, windowingStrategy, sideInputs));
+        context.putDataset(transform, new BoundedDataset<>(GroupCombineFunctions
+            .combinePerKey(inRdd, combineFn,
+                inputCoder, runtimeContext, windowingStrategy, sideInputs)));
       }
     };
   }
@@ -225,8 +226,8 @@ public final class TransformTranslator {
       @Override
       public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<InputT>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
+        JavaRDD<WindowedValue<InputT>> inRDD =
+            ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
         @SuppressWarnings("unchecked")
         final WindowFn<Object, ?> windowFn =
             (WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn();
@@ -234,9 +235,9 @@ public final class TransformTranslator {
             AccumulatorSingleton.getInstance(context.getSparkContext());
         Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
-        context.setOutputRDD(transform,
-            inRDD.mapPartitions(new DoFnFunction<>(accum, transform.getFn(),
-                context.getRuntimeContext(), sideInputs, windowFn)));
+        context.putDataset(transform,
+            new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, transform.getFn(),
+                context.getRuntimeContext(), sideInputs, windowFn))));
       }
     };
   }
@@ -247,8 +248,8 @@ public final class TransformTranslator {
       @Override
       public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<InputT>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
+        JavaRDD<WindowedValue<InputT>> inRDD =
+            ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
         @SuppressWarnings("unchecked")
         final WindowFn<Object, ?> windowFn =
             (WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn();
@@ -268,7 +269,7 @@ public final class TransformTranslator {
           // Object is the best we can do since different outputs can have different tags
           JavaRDD<WindowedValue<Object>> values =
               (JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values();
-          context.setRDD(e.getValue(), values);
+          context.putDataset(e.getValue(), new BoundedDataset<>(values));
         }
       }
     };
@@ -281,8 +282,8 @@ public final class TransformTranslator {
       public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext context) {
         String pattern = transform.getFilepattern();
         JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern)
-                .map(WindowingHelpers.<String>windowFunction());
-        context.setOutputRDD(transform, rdd);
+            .map(WindowingHelpers.<String>windowFunction());
+        context.putDataset(transform, new BoundedDataset<>(rdd));
       }
     };
   }
@@ -293,7 +294,7 @@ public final class TransformTranslator {
       public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
         JavaPairRDD<T, Void> last =
-            ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
+            ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD()
             .map(WindowingHelpers.<T>unwindowFunction())
             .mapToPair(new PairFunction<T, T,
                     Void>() {
@@ -331,7 +332,7 @@ public final class TransformTranslator {
                 return key.datum();
               }
             }).map(WindowingHelpers.<T>windowFunction());
-        context.setOutputRDD(transform, rdd);
+        context.putDataset(transform, new BoundedDataset<>(rdd));
       }
     };
   }
@@ -349,7 +350,7 @@ public final class TransformTranslator {
         AvroJob.setOutputKeySchema(job, transform.getSchema());
         @SuppressWarnings("unchecked")
         JavaPairRDD<AvroKey<T>, NullWritable> last =
-            ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
+            ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD()
             .map(WindowingHelpers.<T>unwindowFunction())
             .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() {
               @Override
@@ -377,7 +378,7 @@ public final class TransformTranslator {
         JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>(
             jsc.sc(), transform.getSource(), runtimeContext).toJavaRDD();
         // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation.
-        context.setOutputRDD(transform, input.cache());
+        context.putDataset(transform, new BoundedDataset<>(input.cache()));
       }
     };
   }
@@ -388,7 +389,7 @@ public final class TransformTranslator {
       public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) {
         String pattern = transform.getFilepattern();
         JavaSparkContext jsc = context.getSparkContext();
-        @SuppressWarnings ("unchecked")
+        @SuppressWarnings("unchecked")
         JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern,
             transform.getFormatClass(),
             transform.getKeyClass(), transform.getValueClass(),
@@ -400,7 +401,7 @@ public final class TransformTranslator {
             return KV.of(t2._1(), t2._2());
           }
         }).map(WindowingHelpers.<KV<K, V>>windowFunction());
-        context.setOutputRDD(transform, rdd);
+        context.putDataset(transform, new BoundedDataset<>(rdd));
       }
     };
   }
@@ -410,8 +411,8 @@ public final class TransformTranslator {
       @Override
       public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaPairRDD<K, V> last = ((JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context
-            .getInputRDD(transform))
+        JavaPairRDD<K, V> last = ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform))
+            .getRDD()
             .map(WindowingHelpers.<KV<K, V>>unwindowFunction())
             .mapToPair(new PairFunction<KV<K, V>, K, V>() {
               @Override
@@ -492,20 +493,20 @@ public final class TransformTranslator {
       @Override
       public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<T>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform);
+        JavaRDD<WindowedValue<T>> inRDD =
+            ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD();
 
         if (TranslationUtils.skipAssignWindows(transform, context)) {
-          context.setOutputRDD(transform, inRDD);
+          context.putDataset(transform, new BoundedDataset<>(inRDD));
         } else {
           @SuppressWarnings("unchecked")
           WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn();
           OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
           Accumulator<NamedAggregators> accum =
               AccumulatorSingleton.getInstance(context.getSparkContext());
-          context.setOutputRDD(transform,
-              inRDD.mapPartitions(new DoFnFunction<>(accum, addWindowsDoFn,
-                  context.getRuntimeContext(), null, null)));
+          context.putDataset(transform,
+              new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, addWindowsDoFn,
+                  context.getRuntimeContext(), null, null))));
         }
       }
     };
@@ -519,7 +520,7 @@ public final class TransformTranslator {
         // Use a coder to convert the objects in the PCollection to byte arrays, so they
         // can be transferred over the network.
         Coder<T> coder = context.getOutput(transform).getCoder();
-        context.setOutputRDDFromValues(transform, elems, coder);
+        context.putBoundedDatasetFromValues(transform, elems, coder);
       }
     };
   }
@@ -530,7 +531,7 @@ public final class TransformTranslator {
       public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
-        context.setPView(context.getOutput(transform), iter);
+        context.putPView(context.getOutput(transform), iter);
       }
     };
   }
@@ -541,7 +542,7 @@ public final class TransformTranslator {
       public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
-        context.setPView(context.getOutput(transform), iter);
+        context.putPView(context.getOutput(transform), iter);
       }
     };
   }
@@ -554,7 +555,7 @@ public final class TransformTranslator {
                            EvaluationContext context) {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
-        context.setPView(context.getOutput(transform), iter);
+        context.putPView(context.getOutput(transform), iter);
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index f8ee8ad..01398e4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import org.apache.beam.runners.spark.SparkContextOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
@@ -54,7 +55,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
     this.options = options;
   }
 
-  private StreamingEvaluationContext ctxt;
+  private EvaluationContext ctxt;
 
   @Override
   public JavaStreamingContext create() {
@@ -72,7 +73,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
 
     JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
     JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
-    ctxt = new StreamingEvaluationContext(jsc, pipeline, jssc,
+    ctxt = new EvaluationContext(jsc, pipeline, jssc,
         options.getTimeout());
     pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
     ctxt.computeOutputs();
@@ -95,7 +96,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
     return jssc;
   }
 
-  public StreamingEvaluationContext getCtxt() {
+  public EvaluationContext getCtxt() {
     return ctxt;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
deleted file mode 100644
index bfba316..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.translation.streaming;
-
-
-import com.google.common.collect.Iterables;
-
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.translation.EvaluationContext;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.runners.spark.translation.WindowingHelpers;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.joda.time.Duration;
-
-
-/**
- * Streaming evaluation context helps to handle streaming.
- */
-public class StreamingEvaluationContext extends EvaluationContext {
-
-  private final JavaStreamingContext jssc;
-  private final long timeout;
-  private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>();
-  private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>();
-
-  public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
-      JavaStreamingContext jssc, long timeout) {
-    super(jsc, pipeline);
-    this.jssc = jssc;
-    this.timeout = timeout;
-  }
-
-  /**
-   * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for
-   * testing.
-   */
-  private class DStreamHolder<T> {
-
-    private Iterable<Iterable<T>> values;
-    private Coder<T> coder;
-    private JavaDStream<WindowedValue<T>> dStream;
-
-    DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
-      this.values = values;
-      this.coder = coder;
-    }
-
-    DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
-      this.dStream = dStream;
-    }
-
-    @SuppressWarnings("unchecked")
-    JavaDStream<WindowedValue<T>> getDStream() {
-      if (dStream == null) {
-        WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
-            WindowedValue.getValueOnlyCoder(coder);
-        // create the DStream from queue
-        Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
-        JavaRDD<WindowedValue<T>> lastRDD = null;
-        for (Iterable<T> v : values) {
-          Iterable<WindowedValue<T>> windowedValues =
-              Iterables.transform(v, WindowingHelpers.<T>windowValueFunction());
-          JavaRDD<WindowedValue<T>> rdd = getSparkContext().parallelize(
-              CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(
-                  CoderHelpers.fromByteFunction(windowCoder));
-          rddQueue.offer(rdd);
-          lastRDD = rdd;
-        }
-        // create dstream from queue, one at a time,
-        // with last as default in case batches repeat (graceful stops for example).
-        // if the stream is empty, avoid creating a default empty RDD.
-        // mainly for unit test so no reason to have this configurable.
-        dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD)
-            : jssc.queueStream(rddQueue, true);
-      }
-      return dStream;
-    }
-  }
-
-  <T> void setDStreamFromQueue(
-      PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
-    pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder));
-  }
-
-  <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) {
-    setStream((PValue) getOutput(transform), dStream);
-  }
-
-  <T> void setStream(PValue pvalue, JavaDStream<WindowedValue<T>> dStream) {
-    DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream);
-    pstreams.put(pvalue, dStreamHolder);
-    leafStreams.add(dStreamHolder);
-  }
-
-  boolean hasStream(PTransform<?, ?> transform) {
-    PValue pvalue = (PValue) getInput(transform);
-    return hasStream(pvalue);
-  }
-
-  boolean hasStream(PValue pvalue) {
-    return pstreams.containsKey(pvalue);
-  }
-
-  JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
-    return getStream((PValue) getInput(transform));
-  }
-
-  JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
-    DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
-    JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
-    leafStreams.remove(dStreamHolder);
-    return dStream;
-  }
-
-  // used to set the RDD from the DStream in the RDDHolder for transformation
-  <T> void setInputRDD(
-      PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) {
-    setRDD((PValue) getInput(transform), rdd);
-  }
-
-  // used to get the RDD transformation output and use it as the DStream transformation output
-  JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
-    return getRDD((PValue) getOutput(transform));
-  }
-
-  public JavaStreamingContext getStreamingContext() {
-    return jssc;
-  }
-
-  @Override
-  public void computeOutputs() {
-    super.computeOutputs(); // in case the pipeline contains bounded branches as well.
-    for (DStreamHolder<?> streamHolder : leafStreams) {
-      computeOutput(streamHolder);
-    } // force a DStream action
-  }
-
-  private static <T> void computeOutput(DStreamHolder<T> streamHolder) {
-    JavaDStream<WindowedValue<T>> dStream = streamHolder.getDStream();
-    // cache in DStream level not RDD
-    // because there could be a difference in StorageLevel if the DStream is windowed.
-    dStream.dstream().cache();
-    dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
-      @Override
-      public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
-        rdd.count();
-      }
-    });
-  }
-
-  @Override
-  public void close(boolean gracefully) {
-    if (timeout > 0) {
-      jssc.awaitTerminationOrTimeout(timeout);
-    } else {
-      jssc.awaitTermination();
-    }
-    // stop streaming context gracefully, so checkpointing (and other computations) get to
-    // finish before shutdown.
-    jssc.stop(false, gracefully);
-    state = State.DONE;
-    super.close(false);
-  }
-
-  private State state = State.RUNNING;
-
-  @Override
-  public State getState() {
-    return state;
-  }
-
-  @Override
-  public State cancel() throws IOException {
-    throw new UnsupportedOperationException(
-        "Spark runner StreamingEvaluationContext does not support cancel.");
-  }
-
-  @Override
-  public State waitUntilFinish() {
-    throw new UnsupportedOperationException(
-        "Spark runner StreamingEvaluationContext does not support waitUntilFinish.");
-  }
-
-  @Override
-  public State waitUntilFinish(Duration duration) {
-    throw new UnsupportedOperationException(
-        "Spark runner StreamingEvaluationContext does not support waitUntilFinish.");
-  }
-
-  //---------------- override in order to expose in package
-  @Override
-  protected <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
-    return super.getInput(transform);
-  }
-  @Override
-  protected <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
-    return super.getOutput(transform);
-  }
-
-  @Override
-  protected JavaSparkContext getSparkContext() {
-    return super.getSparkContext();
-  }
-
-  @Override
-  protected SparkRuntimeContext getRuntimeContext() {
-    return super.getRuntimeContext();
-  }
-
-  @Override
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
-    super.setCurrentTransform(transform);
-  }
-
-  @Override
-  protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
-    return super.getCurrentTransform();
-  }
-
-  @Override
-  protected <T> void setOutputRDD(PTransform<?, ?> transform,
-      JavaRDDLike<WindowedValue<T>, ?> rdd) {
-    super.setOutputRDD(transform, rdd);
-  }
-
-  @Override
-  protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
-      Coder<T> coder) {
-    super.setOutputRDDFromValues(transform, values, coder);
-  }
-
-  @Override
-  protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
-    return super.hasOutputRDD(transform);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 71c27df..b30f079 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -29,6 +29,8 @@ import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.io.SparkUnboundedSource;
+import org.apache.beam.runners.spark.translation.BoundedDataset;
+import org.apache.beam.runners.spark.translation.Dataset;
 import org.apache.beam.runners.spark.translation.DoFnFunction;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
@@ -71,15 +73,13 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 
 
-
 /**
  * Supports translation between a Beam transform, and Spark's operations on DStreams.
  */
-public final class StreamingTransformTranslator {
+final class StreamingTransformTranslator {
 
   private StreamingTransformTranslator() {
   }
@@ -89,9 +89,8 @@ public final class StreamingTransformTranslator {
       @Override
       public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
-        JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
-            (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
-            ((StreamingEvaluationContext) context).getStream(transform);
+        JavaDStream<WindowedValue<T>> dstream =
+            ((UnboundedDataset<T>) (context).borrowDataset(transform)).getDStream();
         dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
       }
     };
@@ -101,9 +100,9 @@ public final class StreamingTransformTranslator {
     return new TransformEvaluator<Read.Unbounded<T>>() {
       @Override
       public void evaluate(Read.Unbounded<T> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        sec.setStream(transform, SparkUnboundedSource.read(sec.getStreamingContext(),
-            sec.getRuntimeContext(), transform.getSource()));
+        context.putDataset(transform,
+            new UnboundedDataset<>(SparkUnboundedSource.read(context.getStreamingContext(),
+                context.getRuntimeContext(), transform.getSource())));
       }
     };
   }
@@ -112,10 +111,9 @@ public final class StreamingTransformTranslator {
     return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
       @Override
       public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         Iterable<Iterable<T>> values = transform.getQueuedValues();
-        Coder<T> coder = sec.getOutput(transform).getCoder();
-        sec.setDStreamFromQueue(transform, values, coder);
+        Coder<T> coder = context.getOutput(transform).getCoder();
+        context.putUnboundedDatasetFromQueue(transform, values, coder);
       }
     };
   }
@@ -125,23 +123,23 @@ public final class StreamingTransformTranslator {
       @SuppressWarnings("unchecked")
       @Override
       public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        PCollectionList<T> pcs = sec.getInput(transform);
+        PCollectionList<T> pcs = context.getInput(transform);
         // since this is a streaming pipeline, at least one of the PCollections to "flatten" are
         // unbounded, meaning it represents a DStream.
         // So we could end up with an unbounded unified DStream.
         final List<JavaRDD<WindowedValue<T>>> rdds = new ArrayList<>();
         final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
-        for (PCollection<T> pcol: pcs.getAll()) {
-          if (sec.hasStream(pcol)) {
-            dStreams.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcol));
+        for (PCollection<T> pcol : pcs.getAll()) {
+         Dataset dataset = context.borrowDataset(pcol);
+          if (dataset instanceof UnboundedDataset) {
+            dStreams.add(((UnboundedDataset<T>) dataset).getDStream());
           } else {
-            rdds.add((JavaRDD<WindowedValue<T>>) context.getRDD(pcol));
+            rdds.add(((BoundedDataset<T>) dataset).getRDD());
           }
         }
         // start by unifying streams into a single stream.
         JavaDStream<WindowedValue<T>> unifiedStreams =
-            sec.getStreamingContext().union(dStreams.remove(0), dStreams);
+            context.getStreamingContext().union(dStreams.remove(0), dStreams);
         // now unify in RDDs.
         if (rdds.size() > 0) {
           JavaDStream<WindowedValue<T>> joined = unifiedStreams.transform(
@@ -152,9 +150,9 @@ public final class StreamingTransformTranslator {
               return new JavaSparkContext(streamRdd.context()).union(streamRdd, rdds);
             }
           });
-          sec.setStream(transform, joined);
+          context.putDataset(transform, new UnboundedDataset<>(joined));
         } else {
-          sec.setStream(transform, unifiedStreams);
+          context.putDataset(transform, new UnboundedDataset<>(unifiedStreams));
         }
       }
     };
@@ -164,12 +162,11 @@ public final class StreamingTransformTranslator {
     return new TransformEvaluator<Window.Bound<T>>() {
       @Override
       public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         @SuppressWarnings("unchecked")
         WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn();
         @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<T>> dStream =
-            (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
+            ((UnboundedDataset<T>) context.borrowDataset(transform)).getDStream();
         // get the right window durations.
         Duration windowDuration;
         Duration slideDuration;
@@ -188,10 +185,10 @@ public final class StreamingTransformTranslator {
             dStream.window(windowDuration, slideDuration);
         //--- then we apply windowing to the elements
         if (TranslationUtils.skipAssignWindows(transform, context)) {
-          sec.setStream(transform, windowedDStream);
+          context.putDataset(transform, new UnboundedDataset<>(windowedDStream));
         } else {
           final OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
-          final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+          final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
           JavaDStream<WindowedValue<T>> outStream = windowedDStream.transform(
               new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() {
             @Override
@@ -202,7 +199,7 @@ public final class StreamingTransformTranslator {
                 new DoFnFunction<>(accum, addWindowsDoFn, runtimeContext, null, null));
             }
           });
-          sec.setStream(transform, outStream);
+          context.putDataset(transform, new UnboundedDataset<>(outStream));
         }
       }
     };
@@ -212,18 +209,16 @@ public final class StreamingTransformTranslator {
     return new TransformEvaluator<GroupByKey<K, V>>() {
       @Override
       public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-
         @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<KV<K, V>>> dStream =
-            (JavaDStream<WindowedValue<KV<K, V>>>) sec.getStream(transform);
+            ((UnboundedDataset<KV<K, V>>) context.borrowDataset(transform)).getDStream();
 
         @SuppressWarnings("unchecked")
-        final KvCoder<K, V> coder = (KvCoder<K, V>) sec.getInput(transform).getCoder();
+        final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
 
-        final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final WindowingStrategy<?, ?> windowingStrategy =
-            sec.getInput(transform).getWindowingStrategy();
+            context.getInput(transform).getWindowingStrategy();
 
         JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
             dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>,
@@ -237,7 +232,7 @@ public final class StreamingTransformTranslator {
                 windowingStrategy);
           }
         });
-        sec.setStream(transform, outStream);
+        context.putDataset(transform, new UnboundedDataset<>(outStream));
       }
     };
   }
@@ -245,29 +240,29 @@ public final class StreamingTransformTranslator {
   private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
   combineGrouped() {
     return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() {
+      @SuppressWarnings("unchecked")
       @Override
       public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform,
                            EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         // get the applied combine function.
         PCollection<? extends KV<K, ? extends Iterable<InputT>>> input =
-            sec.getInput(transform);
+            context.getInput(transform);
         WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-        @SuppressWarnings("unchecked")
         final CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn =
             (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>)
                 CombineFnUtil.toFnWithContext(transform.getFn());
 
-        @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream =
-            (JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>>) sec.getStream(transform);
+            ((UnboundedDataset<KV<K, Iterable<InputT>>>) context.borrowDataset(transform))
+                .getDStream();
 
-        SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext  =
-            new SparkKeyedCombineFn<>(fn, sec.getRuntimeContext(),
+        SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext =
+            new SparkKeyedCombineFn<>(fn, context.getRuntimeContext(),
                 TranslationUtils.getSideInputs(transform.getSideInputs(), context),
-                    windowingStrategy);
-        sec.setStream(transform, dStream.map(new TranslationUtils.CombineGroupedValues<>(
-            combineFnWithContext)));
+                windowingStrategy);
+        context.putDataset(transform, new UnboundedDataset<>(dStream.map(new TranslationUtils
+            .CombineGroupedValues<>(
+            combineFnWithContext))));
       }
     };
   }
@@ -276,26 +271,24 @@ public final class StreamingTransformTranslator {
   combineGlobally() {
     return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() {
 
+      @SuppressWarnings("unchecked")
       @Override
       public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        final PCollection<InputT> input = sec.getInput(transform);
+        final PCollection<InputT> input = context.getInput(transform);
         // serializable arguments to pass.
-        final Coder<InputT> iCoder = sec.getInput(transform).getCoder();
-        final Coder<OutputT> oCoder = sec.getOutput(transform).getCoder();
-        @SuppressWarnings("unchecked")
+        final Coder<InputT> iCoder = context.getInput(transform).getCoder();
+        final Coder<OutputT> oCoder = context.getOutput(transform).getCoder();
         final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn =
             (CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>)
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-        final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         final boolean hasDefault = transform.isInsertDefault();
 
-        @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<InputT>> dStream =
-            (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform);
+            ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
 
         JavaDStream<WindowedValue<OutputT>> outStream = dStream.transform(
             new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>() {
@@ -307,7 +300,7 @@ public final class StreamingTransformTranslator {
           }
         });
 
-        sec.setStream(transform, outStream);
+        context.putDataset(transform, new UnboundedDataset<>(outStream));
       }
     };
   }
@@ -315,27 +308,24 @@ public final class StreamingTransformTranslator {
   private static <K, InputT, AccumT, OutputT>
   TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
     return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() {
+      @SuppressWarnings("unchecked")
       @Override
       public void evaluate(final Combine.PerKey<K, InputT, OutputT> transform,
                            final EvaluationContext context) {
-        StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        final PCollection<KV<K, InputT>> input = sec.getInput(transform);
+        final PCollection<KV<K, InputT>> input = context.getInput(transform);
         // serializable arguments to pass.
-        @SuppressWarnings("unchecked")
         final KvCoder<K, InputT> inputCoder =
-            (KvCoder<K, InputT>) sec.getInput(transform).getCoder();
-        @SuppressWarnings("unchecked")
+            (KvCoder<K, InputT>) context.getInput(transform).getCoder();
         final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn =
             (CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>)
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-        final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
 
-        @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<KV<K, InputT>>> dStream =
-                (JavaDStream<WindowedValue<KV<K, InputT>>>) sec.getStream(transform);
+            ((UnboundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getDStream();
 
         JavaDStream<WindowedValue<KV<K, OutputT>>> outStream =
             dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, InputT>>>,
@@ -347,26 +337,24 @@ public final class StreamingTransformTranslator {
                 windowingStrategy, sideInputs);
           }
         });
-        sec.setStream(transform, outStream);
+        context.putDataset(transform, new UnboundedDataset<>(outStream));
       }
     };
   }
 
   private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
     return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
+      @SuppressWarnings("unchecked")
       @Override
       public void evaluate(final ParDo.Bound<InputT, OutputT> transform,
                            final EvaluationContext context) {
-        final StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
-        @SuppressWarnings("unchecked")
         final WindowFn<Object, ?> windowFn =
-            (WindowFn<Object, ?>) sec.getInput(transform).getWindowingStrategy().getWindowFn();
-        @SuppressWarnings("unchecked")
+            (WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn();
         JavaDStream<WindowedValue<InputT>> dStream =
-            (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform);
+            ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
 
         JavaDStream<WindowedValue<OutputT>> outStream =
             dStream.transform(new Function<JavaRDD<WindowedValue<InputT>>,
@@ -381,7 +369,7 @@ public final class StreamingTransformTranslator {
           }
         });
 
-        sec.setStream(transform, outStream);
+        context.putDataset(transform, new UnboundedDataset<>(outStream));
       }
     };
   }
@@ -392,16 +380,15 @@ public final class StreamingTransformTranslator {
       @Override
       public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform,
                            final EvaluationContext context) {
-        final StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         @SuppressWarnings("unchecked")
         final WindowFn<Object, ?> windowFn =
-            (WindowFn<Object, ?>) sec.getInput(transform).getWindowingStrategy().getWindowFn();
+            (WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn();
         @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<InputT>> dStream =
-            (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform);
+            ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
         JavaPairDStream<TupleTag<?>, WindowedValue<?>> all = dStream.transformToPair(
             new Function<JavaRDD<WindowedValue<InputT>>,
                 JavaPairRDD<TupleTag<?>, WindowedValue<?>>>() {
@@ -414,7 +401,7 @@ public final class StreamingTransformTranslator {
                 runtimeContext, transform.getMainOutputTag(), sideInputs, windowFn));
           }
         }).cache();
-        PCollectionTuple pct = sec.getOutput(transform);
+        PCollectionTuple pct = context.getOutput(transform);
         for (Map.Entry<TupleTag<?>, PCollection<?>> e : pct.getAll().entrySet()) {
           @SuppressWarnings("unchecked")
           JavaPairDStream<TupleTag<?>, WindowedValue<?>> filtered =
@@ -424,7 +411,7 @@ public final class StreamingTransformTranslator {
           JavaDStream<WindowedValue<Object>> values =
               (JavaDStream<WindowedValue<Object>>)
                   (JavaDStream<?>) TranslationUtils.dStreamValues(filtered);
-          sec.setStream(e.getValue(), values);
+          context.putDataset(e.getValue(), new UnboundedDataset<>(values));
         }
       }
     };

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
new file mode 100644
index 0000000..67adee2
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation.streaming;
+
+import com.google.common.collect.Iterables;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.translation.Dataset;
+import org.apache.beam.runners.spark.translation.WindowingHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+
+/**
+ * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for testing.
+ */
+public class UnboundedDataset<T> implements Dataset {
+  // only set if creating a DStream from a static collection
+  @Nullable private transient JavaStreamingContext jssc;
+
+  private Iterable<Iterable<T>> values;
+  private Coder<T> coder;
+  private JavaDStream<WindowedValue<T>> dStream;
+
+  UnboundedDataset(JavaDStream<WindowedValue<T>> dStream) {
+    this.dStream = dStream;
+  }
+
+  public UnboundedDataset(Iterable<Iterable<T>> values, JavaStreamingContext jssc, Coder<T> coder) {
+    this.values = values;
+    this.jssc = jssc;
+    this.coder = coder;
+  }
+
+  @SuppressWarnings("ConstantConditions")
+  JavaDStream<WindowedValue<T>> getDStream() {
+    if (dStream == null) {
+      WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
+          WindowedValue.getValueOnlyCoder(coder);
+      // create the DStream from queue
+      Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
+      JavaRDD<WindowedValue<T>> lastRDD = null;
+      for (Iterable<T> v : values) {
+        Iterable<WindowedValue<T>> windowedValues =
+            Iterables.transform(v, WindowingHelpers.<T>windowValueFunction());
+        JavaRDD<WindowedValue<T>> rdd = jssc.sc().parallelize(
+            CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(
+            CoderHelpers.fromByteFunction(windowCoder));
+        rddQueue.offer(rdd);
+        lastRDD = rdd;
+      }
+      // create DStream from queue, one at a time,
+      // with last as default in case batches repeat (graceful stops for example).
+      // if the stream is empty, avoid creating a default empty RDD.
+      // mainly for unit test so no reason to have this configurable.
+      dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD)
+          : jssc.queueStream(rddQueue, true);
+    }
+    return dStream;
+  }
+
+  @Override
+  public void cache() {
+    dStream.cache();
+  }
+
+  @Override
+  public void action() {
+    dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
+      @Override
+      public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
+        rdd.count();
+      }
+    });
+  }
+
+  @Override
+  public void setName(String name) {
+    // ignore
+  }
+}


[2/2] incubator-beam git commit: This closes #1291

Posted by am...@apache.org.
This closes #1291


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2bc66f90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2bc66f90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2bc66f90

Branch: refs/heads/master
Commit: 2bc66f903cdfa328c4bb3546befbaa0f58bdd6fa
Parents: 9c300cd 1bef01f
Author: Sela <an...@paypal.com>
Authored: Tue Nov 15 13:37:20 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 15 13:37:20 2016 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  |   4 +-
 .../spark/translation/BoundedDataset.java       | 114 ++++++++
 .../beam/runners/spark/translation/Dataset.java |  34 +++
 .../spark/translation/EvaluationContext.java    | 230 +++++++---------
 .../spark/translation/TransformTranslator.java  |  99 +++----
 .../SparkRunnerStreamingContextFactory.java     |   7 +-
 .../streaming/StreamingEvaluationContext.java   | 272 -------------------
 .../streaming/StreamingTransformTranslator.java | 135 +++++----
 .../translation/streaming/UnboundedDataset.java | 103 +++++++
 9 files changed, 464 insertions(+), 534 deletions(-)
----------------------------------------------------------------------