You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/01/14 09:53:54 UTC

[beam] branch spark-runner_structured-streaming updated (d5f235d -> c6618c5)

This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


 discard d5f235d  Enable gradle build scan
 discard 2acdf67  Enable test mode
 discard a7d2328  Put all transform translators Serializable
 discard 9fad3d4  Simplify beam reader creation as it created once the source as already been partitioned
 discard 3be7f2d  Fix SourceTest
 discard 002f0b4  Move SourceTest to same package as tested class
 discard 47c20c2  Add serialization test
 discard 43c737b  Fix SerializationDebugger
 discard 5c9fcd3  Add SerializationDebugger
 discard bab9027  Fix serialization issues
     new c6618c5  First attempt for ParDo primitive implementation

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (d5f235d)
            \
             N -- N -- N   refs/heads/spark-runner_structured-streaming (c6618c5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 runners/spark-structured-streaming/build.gradle    |   4 -
 .../spark/structuredstreaming/SparkRunner.java     |   2 +-
 .../translation/TransformTranslator.java           |   3 +-
 .../translation/TranslationContext.java            |  23 ++-
 .../translation/batch/DatasetSourceBatch.java      |  80 +++++-----
 .../translation/batch/DoFnFunction.java            | 137 ++++++++++++++++
 .../translation/batch/ParDoTranslatorBatch.java    | 174 ++++++++++++++++++++-
 .../translation/batch}/SparkProcessContext.java    |  32 +---
 .../batch/functions/SparkNoOpStepContext.java}     |   6 +-
 .../batch/functions/SparkSideInputReader.java}     |  45 +++---
 .../spark/structuredstreaming/SourceTest.java      |  29 ++++
 .../translation/batch/SourceTest.java              |  79 ----------
 .../utils/SerializationDebugger.java               | 131 ----------------
 .../structuredstreaming/utils/package-info.java    |  20 ---
 14 files changed, 427 insertions(+), 338 deletions(-)
 create mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
 copy runners/{spark/src/main/java/org/apache/beam/runners/spark/translation => spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch}/SparkProcessContext.java (88%)
 copy runners/{flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java => spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java} (85%)
 copy runners/{google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/LazilyInitializedSideInputReader.java => spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java} (55%)
 create mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
 delete mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
 delete mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/SerializationDebugger.java
 delete mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/package-info.java


[beam] 01/01: First attempt for ParDo primitive implementation

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c6618c5d4099e3fa2e7f15d7f8388d2b6b9905b0
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Mon Jan 7 10:47:04 2019 +0100

    First attempt for ParDo primitive implementation
---
 .../translation/TranslationContext.java            |  12 ++
 .../translation/batch/DoFnFunction.java            | 137 ++++++++++++++++
 .../translation/batch/ParDoTranslatorBatch.java    | 174 ++++++++++++++++++++-
 .../translation/batch/SparkProcessContext.java     | 149 ++++++++++++++++++
 .../SparkNoOpStepContext.java}                     |  24 +--
 .../batch/functions/SparkSideInputReader.java      |  62 ++++++++
 6 files changed, 545 insertions(+), 13 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index e40bb85..ab136dc 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -108,6 +108,13 @@ public class TranslationContext {
     return (Dataset<WindowedValue<T>>) dataset;
   }
 
+  public void putDatasetWildcard(PValue value, Dataset<WindowedValue<?>> dataset) {
+    if (!datasets.containsKey(value)) {
+      datasets.put(value, dataset);
+      leaves.add(dataset);
+    }
+  }
+
   public <T> void putDataset(PValue value, Dataset<WindowedValue<T>> dataset) {
     if (!datasets.containsKey(value)) {
       datasets.put(value, dataset);
@@ -131,6 +138,11 @@ public class TranslationContext {
   }
 
   @SuppressWarnings("unchecked")
+  public <T extends PValue> T getInput(PTransform<T, ?> transform) {
+    return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+  }
+
+  @SuppressWarnings("unchecked")
   public Map<TupleTag<?>, PValue> getInputs() {
     return currentTransform.getInputs();
   }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
new file mode 100644
index 0000000..35204bc
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkNoOpStepContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import scala.Tuple2;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class DoFnFunction<InputT, OutputT>
+    implements MapPartitionsFunction<WindowedValue<InputT>, Tuple2<TupleTag<?>, WindowedValue<?>>> {
+
+  private final SerializablePipelineOptions serializedOptions;
+
+  private final DoFn<InputT, OutputT> doFn;
+  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+  private final WindowingStrategy<?, ?> windowingStrategy;
+
+  private final Map<TupleTag<?>, Integer> outputMap;
+  private final TupleTag<OutputT> mainOutputTag;
+  private final Coder<InputT> inputCoder;
+  private final Map<TupleTag<?>, Coder<?>> outputCoderMap;
+
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+
+  public DoFnFunction(
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions options,
+      Map<TupleTag<?>, Integer> outputMap,
+      TupleTag<OutputT> mainOutputTag,
+      Coder<InputT> inputCoder,
+      Map<TupleTag<?>, Coder<?>> outputCoderMap) {
+
+    this.doFn = doFn;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializablePipelineOptions(options);
+    this.windowingStrategy = windowingStrategy;
+    this.outputMap = outputMap;
+    this.mainOutputTag = mainOutputTag;
+    this.inputCoder = inputCoder;
+    this.outputCoderMap = outputCoderMap;
+  }
+
+  @Override
+  public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> iter)
+      throws Exception {
+
+    DoFnOutputManager outputManager = new DoFnOutputManager();
+
+    List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
+
+    DoFnRunner<InputT, OutputT> doFnRunner =
+        DoFnRunners.simpleRunner(
+            serializedOptions.get(),
+            doFn,
+            new SparkSideInputReader(sideInputs),
+            outputManager,
+            mainOutputTag,
+            additionalOutputTags,
+            new SparkNoOpStepContext(),
+            inputCoder,
+            outputCoderMap,
+            windowingStrategy);
+
+    return new SparkProcessContext<>(doFn, doFnRunner, outputManager, Collections.emptyIterator())
+        .processPartition(iter)
+        .iterator();
+  }
+
+  private class DoFnOutputManager
+      implements SparkProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> {
+
+    private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();
+
+    @Override
+    public void clear() {
+      outputs.clear();
+    }
+
+    @Override
+    public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> iterator() {
+      Iterator<Map.Entry<TupleTag<?>, WindowedValue<?>>> entryIter = outputs.entries().iterator();
+      return Iterators.transform(entryIter, this.entryToTupleFn());
+    }
+
+    private <K, V> Function<Map.Entry<K, V>, Tuple2<K, V>> entryToTupleFn() {
+      return en -> new Tuple2<>(en.getKey(), en.getValue());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public synchronized <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      outputs.put(tag, output);
+    }
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 1e57098..1ad1e3b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -17,16 +17,184 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
+import org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import scala.Tuple2;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * TODO: Add support of state and timers TODO: Add support of side inputs
+ *
+ * @param <InputT>
+ * @param <OutputT>
+ */
 class ParDoTranslatorBatch<InputT, OutputT>
     implements TransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>> {
 
   @Override
   public void translateTransform(
-      PTransform<PCollection<InputT>, PCollectionTuple> transform, TranslationContext context) {}
+      PTransform<PCollection<InputT>, PCollectionTuple> transform, TranslationContext context) {
+
+    DoFn<InputT, OutputT> doFn = getDoFn(context);
+    checkState(
+        !DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable(),
+        "Not expected to directly translate splittable DoFn, should have been overridden: %s",
+        doFn);
+
+    Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput());
+    Map<TupleTag<?>, PValue> outputs = context.getOutputs();
+    TupleTag<?> mainOutputTag = getTupleTag(context);
+
+    Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
+
+    outputMap.put(mainOutputTag, 0);
+    int count = 1;
+    for (TupleTag<?> tag : outputs.keySet()) {
+      if (!outputMap.containsKey(tag)) {
+        outputMap.put(tag, count++);
+      }
+    }
+
+    // Union coder elements must match the order of the output tags.
+    Map<Integer, TupleTag<?>> indexMap = Maps.newTreeMap();
+    for (Map.Entry<TupleTag<?>, Integer> entry : outputMap.entrySet()) {
+      indexMap.put(entry.getValue(), entry.getKey());
+    }
+
+    // assume that the windowing strategy is the same for all outputs
+    WindowingStrategy<?, ?> windowingStrategy = null;
+
+    // collect all output Coders and create a UnionCoder for our tagged outputs
+    List<Coder<?>> outputCoders = Lists.newArrayList();
+    for (TupleTag<?> tag : indexMap.values()) {
+      PValue taggedValue = outputs.get(tag);
+      checkState(
+          taggedValue instanceof PCollection,
+          "Within ParDo, got a non-PCollection output %s of type %s",
+          taggedValue,
+          taggedValue.getClass().getSimpleName());
+      PCollection<?> coll = (PCollection<?>) taggedValue;
+      outputCoders.add(coll.getCoder());
+      windowingStrategy = coll.getWindowingStrategy();
+    }
+
+    if (windowingStrategy == null) {
+      throw new IllegalStateException("No outputs defined.");
+    }
+
+    UnionCoder unionCoder = UnionCoder.of(outputCoders);
+
+    List<PCollectionView<?>> sideInputs = getSideInputs(context);
+
+    // construct a map from side input to WindowingStrategy so that
+    // the DoFn runner can map main-input windows to side input windows
+    Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+    for (PCollectionView<?> sideInput : sideInputs) {
+      sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+    }
+
+    Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders();
+
+    @SuppressWarnings("unchecked")
+    DoFnFunction<InputT, OutputT> doFnWrapper =
+        new DoFnFunction(
+            doFn,
+            windowingStrategy,
+            sideInputStrategies,
+            context.getOptions(),
+            outputMap,
+            mainOutputTag,
+            context.getInput(transform).getCoder(),
+            outputCoderMap);
+
+    Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputsDataset =
+        inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.encoder());
+
+    for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+      pruneOutput(context, allOutputsDataset, output);
+    }
+  }
+
+  private List<PCollectionView<?>> getSideInputs(TranslationContext context) {
+    List<PCollectionView<?>> sideInputs;
+    try {
+      sideInputs = ParDoTranslation.getSideInputs(context.getCurrentTransform());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return sideInputs;
+  }
+
+  private TupleTag<?> getTupleTag(TranslationContext context) {
+    TupleTag<?> mainOutputTag;
+    try {
+      mainOutputTag = ParDoTranslation.getMainOutputTag(context.getCurrentTransform());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return mainOutputTag;
+  }
+
+  @SuppressWarnings("unchecked")
+  private DoFn<InputT, OutputT> getDoFn(TranslationContext context) {
+    DoFn<InputT, OutputT> doFn;
+    try {
+      doFn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(context.getCurrentTransform());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return doFn;
+  }
+
+  private <T> void pruneOutput(
+      TranslationContext context,
+      Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> tmpDataset,
+      Map.Entry<TupleTag<?>, PValue> output) {
+    Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> filteredDataset =
+        tmpDataset.filter(new SparkDoFnFilterFunction(output.getKey()));
+    Dataset<WindowedValue<?>> outputDataset =
+        filteredDataset.map(
+            (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>)
+                value -> value._2,
+            EncoderHelpers.encoder());
+    context.putDatasetWildcard(output.getValue(), outputDataset);
+  }
+
+  class SparkDoFnFilterFunction implements FilterFunction<Tuple2<TupleTag<?>, WindowedValue<?>>> {
+
+    private final TupleTag<?> key;
+
+    public SparkDoFnFilterFunction(TupleTag<?> key) {
+      this.key = key;
+    }
+
+    @Override
+    public boolean call(Tuple2<TupleTag<?>, WindowedValue<?>> value) {
+      return value._1.equals(key);
+    }
+  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java
new file mode 100644
index 0000000..720b7ab
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import com.google.common.collect.AbstractIterator;
+import org.apache.beam.runners.core.*;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/** Spark runner process context processes Spark partitions using Beam's {@link DoFnRunner}. */
+class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
+
+  private final DoFn<FnInputT, FnOutputT> doFn;
+  private final DoFnRunner<FnInputT, FnOutputT> doFnRunner;
+  private final SparkOutputManager<OutputT> outputManager;
+  private Iterator<TimerInternals.TimerData> timerDataIterator;
+
+  SparkProcessContext(
+      DoFn<FnInputT, FnOutputT> doFn,
+      DoFnRunner<FnInputT, FnOutputT> doFnRunner,
+      SparkOutputManager<OutputT> outputManager,
+      Iterator<TimerInternals.TimerData> timerDataIterator) {
+
+    this.doFn = doFn;
+    this.doFnRunner = doFnRunner;
+    this.outputManager = outputManager;
+    this.timerDataIterator = timerDataIterator;
+  }
+
+  Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> partition) throws Exception {
+
+    // skip if partition is empty.
+    if (!partition.hasNext()) {
+      return new ArrayList<>();
+    }
+
+    // process the partition; finishBundle() is called from within the output iterator.
+    return this.getOutputIterable(partition, doFnRunner);
+  }
+
+  private void clearOutput() {
+    outputManager.clear();
+  }
+
+  private Iterator<OutputT> getOutputIterator() {
+    return outputManager.iterator();
+  }
+
+  private Iterable<OutputT> getOutputIterable(
+      final Iterator<WindowedValue<FnInputT>> iter,
+      final DoFnRunner<FnInputT, FnOutputT> doFnRunner) {
+    return () -> new ProcCtxtIterator(iter, doFnRunner);
+  }
+
+  interface SparkOutputManager<T> extends OutputManager, Iterable<T> {
+    void clear();
+  }
+
+  private class ProcCtxtIterator extends AbstractIterator<OutputT> {
+
+    private final Iterator<WindowedValue<FnInputT>> inputIterator;
+    private final DoFnRunner<FnInputT, FnOutputT> doFnRunner;
+    private Iterator<OutputT> outputIterator;
+    private boolean isBundleStarted;
+    private boolean isBundleFinished;
+
+    ProcCtxtIterator(
+        Iterator<WindowedValue<FnInputT>> iterator, DoFnRunner<FnInputT, FnOutputT> doFnRunner) {
+      this.inputIterator = iterator;
+      this.doFnRunner = doFnRunner;
+      this.outputIterator = getOutputIterator();
+    }
+
+    @Override
+    protected OutputT computeNext() {
+      // Process each element from the (input) iterator, which produces, zero, one or more
+      // output elements (of type V) in the output iterator. Note that the output
+      // collection (and iterator) is reset between each call to processElement, so the
+      // collection only holds the output values for each call to processElement, rather
+      // than for the whole partition (which would use too much memory).
+      if (!isBundleStarted) {
+        isBundleStarted = true;
+        // call startBundle() before beginning to process the partition.
+        doFnRunner.startBundle();
+      }
+
+      try {
+        while (true) {
+          if (outputIterator.hasNext()) {
+            return outputIterator.next();
+          }
+
+          clearOutput();
+          if (inputIterator.hasNext()) {
+            // grab the next element and process it.
+            doFnRunner.processElement(inputIterator.next());
+            outputIterator = getOutputIterator();
+          } else if (timerDataIterator.hasNext()) {
+            fireTimer(timerDataIterator.next());
+            outputIterator = getOutputIterator();
+          } else {
+            // no more input to consume, but finishBundle can produce more output
+            if (!isBundleFinished) {
+              isBundleFinished = true;
+              doFnRunner.finishBundle();
+              outputIterator = getOutputIterator();
+              continue; // try to consume outputIterator from start of loop
+            }
+            DoFnInvokers.invokerFor(doFn).invokeTeardown();
+            return endOfData();
+          }
+        }
+      } catch (final RuntimeException re) {
+        DoFnInvokers.invokerFor(doFn).invokeTeardown();
+        throw re;
+      }
+    }
+
+    private void fireTimer(TimerInternals.TimerData timer) {
+      StateNamespace namespace = timer.getNamespace();
+      checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+      BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
+      doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+    }
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
similarity index 59%
copy from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
copy to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
index 1e57098..889cdf5 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
@@ -15,18 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;
 
-import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
 
-class ParDoTranslatorBatch<InputT, OutputT>
-    implements TransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>> {
+/** A {@link StepContext} for Spark Batch Runner execution. */
+public class SparkNoOpStepContext implements StepContext {
 
   @Override
-  public void translateTransform(
-      PTransform<PCollection<InputT>, PCollectionTuple> transform, TranslationContext context) {}
+  public StateInternals stateInternals() {
+    throw new UnsupportedOperationException("stateInternals is not supported");
+  }
+
+  @Override
+  public TimerInternals timerInternals() {
+    throw new UnsupportedOperationException("timerInternals is not supported");
+  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java
new file mode 100644
index 0000000..da75101
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;
+
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * TODO: Need to be implemented
+ *
+ * A {@link SideInputReader} for the Spark Batch Runner.
+ */
+public class SparkSideInputReader implements SideInputReader {
+  private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
+
+
+  public SparkSideInputReader(
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView) {
+    sideInputs = new HashMap<>();
+  }
+
+  @Nullable
+  @Override
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+    return null;
+  }
+
+  @Override
+  public <T> boolean contains(PCollectionView<T> view) {
+    return sideInputs.containsKey(view.getTagInternal());
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return sideInputs.isEmpty();
+  }
+}