You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:21 UTC

[03/50] incubator-beam git commit: Rename SimpleDoFnRunner to SimpleOldDoFnRunner and remove public access

Rename SimpleDoFnRunner to SimpleOldDoFnRunner and remove public access


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8ff6c9d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8ff6c9d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8ff6c9d9

Branch: refs/heads/apex-runner
Commit: 8ff6c9d9ae0294b60164a36c096a6c6d59047908
Parents: f043865
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 20:08:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 18:32:06 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunners.java   |   2 +-
 .../beam/runners/core/SimpleDoFnRunner.java     | 521 -------------------
 .../beam/runners/core/SimpleOldDoFnRunner.java  | 521 +++++++++++++++++++
 .../beam/runners/core/SimpleDoFnRunnerTest.java |  88 ----
 .../runners/core/SimpleOldDoFnRunnerTest.java   |  88 ++++
 5 files changed, 610 insertions(+), 610 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ff6c9d9/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 41eb9d1..13cd161 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -62,7 +62,7 @@ public class DoFnRunners {
       StepContext stepContext,
       AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
-    return new SimpleDoFnRunner<>(
+    return new SimpleOldDoFnRunner<>(
         options,
         fn,
         sideInputReader,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ff6c9d9/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
deleted file mode 100644
index 8f25705..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ /dev/null
@@ -1,521 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-
-/**
- * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in.
- *
- * @param <InputT> the type of the {@link OldDoFn} (main) input elements
- * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
- */
-public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-
-  /** The {@link OldDoFn} being run. */
-  private final OldDoFn<InputT, OutputT> fn;
-  /** The context used for running the {@link OldDoFn}. */
-  private final DoFnContext<InputT, OutputT> context;
-
-  public SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
-      AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) {
-    this.fn = fn;
-    this.context = new DoFnContext<>(
-        options,
-        fn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        aggregatorFactory,
-        windowingStrategy == null ? null : windowingStrategy.getWindowFn());
-  }
-
-  @Override
-  public void startBundle() {
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.startBundle(context);
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
-    }
-  }
-
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    if (elem.getWindows().size() <= 1
-        || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
-            && context.sideInputReader.isEmpty())) {
-      invokeProcessElement(elem);
-    } else {
-      // We could modify the windowed value (and the processContext) to
-      // avoid repeated allocations, but this is more straightforward.
-      for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
-        invokeProcessElement(windowedValue);
-      }
-    }
-  }
-
-  private void invokeProcessElement(WindowedValue<InputT> elem) {
-    final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.processElement(processContext);
-    } catch (Exception ex) {
-      throw wrapUserCodeException(ex);
-    }
-  }
-
-  @Override
-  public void finishBundle() {
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.finishBundle(context);
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
-    }
-  }
-
-  /**
-   * Returns a new {@link OldDoFn.ProcessContext} for the given element.
-   */
-  private OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
-      WindowedValue<InputT> elem) {
-    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
-  }
-
-  private RuntimeException wrapUserCodeException(Throwable t) {
-    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
-  }
-
-  private boolean isSystemDoFn() {
-    return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
-  }
-
-  /**
-   * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
-   *
-   * @param <InputT> the type of the {@link OldDoFn} (main) input elements
-   * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
-   */
-  private static class DoFnContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.Context {
-    private static final int MAX_SIDE_OUTPUTS = 1000;
-
-    final PipelineOptions options;
-    final OldDoFn<InputT, OutputT> fn;
-    final SideInputReader sideInputReader;
-    final OutputManager outputManager;
-    final TupleTag<OutputT> mainOutputTag;
-    final StepContext stepContext;
-    final AggregatorFactory aggregatorFactory;
-    final WindowFn<?, ?> windowFn;
-
-    /**
-     * The set of known output tags, some of which may be undeclared, so we can throw an
-     * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
-     */
-    private Set<TupleTag<?>> outputTags;
-
-    public DoFnContext(PipelineOptions options,
-                       OldDoFn<InputT, OutputT> fn,
-                       SideInputReader sideInputReader,
-                       OutputManager outputManager,
-                       TupleTag<OutputT> mainOutputTag,
-                       List<TupleTag<?>> sideOutputTags,
-                       StepContext stepContext,
-                       AggregatorFactory aggregatorFactory,
-                       WindowFn<?, ?> windowFn) {
-      fn.super();
-      this.options = options;
-      this.fn = fn;
-      this.sideInputReader = sideInputReader;
-      this.outputManager = outputManager;
-      this.mainOutputTag = mainOutputTag;
-      this.outputTags = Sets.newHashSet();
-
-      outputTags.add(mainOutputTag);
-      for (TupleTag<?> sideOutputTag : sideOutputTags) {
-        outputTags.add(sideOutputTag);
-      }
-
-      this.stepContext = stepContext;
-      this.aggregatorFactory = aggregatorFactory;
-      this.windowFn = windowFn;
-      super.setupDelegateAggregators();
-    }
-
-    //////////////////////////////////////////////////////////////////////////////
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
-        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
-      final Instant inputTimestamp = timestamp;
-
-      if (timestamp == null) {
-        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-
-      if (windows == null) {
-        try {
-          // The windowFn can never succeed at accessing the element, so its type does not
-          // matter here
-          @SuppressWarnings("unchecked")
-          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
-          windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
-            @Override
-            public Object element() {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input element when none was available");
-            }
-
-            @Override
-            public Instant timestamp() {
-              if (inputTimestamp == null) {
-                throw new UnsupportedOperationException(
-                    "WindowFn attempted to access input timestamp when none was available");
-              }
-              return inputTimestamp;
-            }
-
-            @Override
-            public W window() {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input windows when none were available");
-            }
-          });
-        } catch (Exception e) {
-          throw UserCodeException.wrap(e);
-        }
-      }
-
-      return WindowedValue.of(output, timestamp, windows, pane);
-    }
-
-    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-      if (!sideInputReader.contains(view)) {
-        throw new IllegalArgumentException("calling sideInput() with unknown view");
-      }
-      BoundedWindow sideInputWindow =
-          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
-      return sideInputReader.get(view, sideInputWindow);
-    }
-
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
-      outputManager.output(mainOutputTag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteOutput(windowedElem);
-      }
-    }
-
-    private <T> void sideOutputWindowedValue(TupleTag<T> tag,
-                                               T output,
-                                               Instant timestamp,
-                                               Collection<? extends BoundedWindow> windows,
-                                               PaneInfo pane) {
-      sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
-      if (!outputTags.contains(tag)) {
-        // This tag wasn't declared nor was it seen before during this execution.
-        // Thus, this must be a new, undeclared and unconsumed output.
-        // To prevent likely user errors, enforce the limit on the number of side
-        // outputs.
-        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
-          throw new IllegalArgumentException(
-              "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
-        }
-        outputTags.add(tag);
-      }
-
-      outputManager.output(tag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteSideOutput(tag, windowedElem);
-      }
-    }
-
-    // Following implementations of output, outputWithTimestamp, and sideOutput
-    // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by
-    // ProcessContext's versions in OldDoFn.processElement.
-    @Override
-    public void output(OutputT output) {
-      outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
-      sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
-      sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null");
-      return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
-    }
-  }
-
-  /**
-   * A concrete implementation of {@link OldDoFn.ProcessContext} used for running a {@link OldDoFn}
-   * over a single element.
-   *
-   * @param <InputT> the type of the {@link OldDoFn} (main) input elements
-   * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
-   */
-  private static class DoFnProcessContext<InputT, OutputT>
-      extends OldDoFn<InputT, OutputT>.ProcessContext {
-
-
-    final OldDoFn<InputT, OutputT> fn;
-    final DoFnContext<InputT, OutputT> context;
-    final WindowedValue<InputT> windowedValue;
-
-    public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
-                              DoFnContext<InputT, OutputT> context,
-                              WindowedValue<InputT> windowedValue) {
-      fn.super();
-      this.fn = fn;
-      this.context = context;
-      this.windowedValue = windowedValue;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public InputT element() {
-      return windowedValue.getValue();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      checkNotNull(view, "View passed to sideInput cannot be null");
-      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
-      BoundedWindow window;
-      if (!windowIter.hasNext()) {
-        if (context.windowFn instanceof GlobalWindows) {
-          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
-          // without windows
-          window = GlobalWindow.INSTANCE;
-        } else {
-          throw new IllegalStateException(
-              "sideInput called when main input element is not in any windows");
-        }
-      } else {
-        window = windowIter.next();
-        if (windowIter.hasNext()) {
-          throw new IllegalStateException(
-              "sideInput called when main input element is in multiple windows");
-        }
-      }
-      return context.sideInput(view, window);
-    }
-
-    @Override
-    public BoundedWindow window() {
-      if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
-        throw new UnsupportedOperationException(
-            "window() is only available in the context of a OldDoFn marked as"
-                + "RequiresWindowAccess.");
-      }
-      return Iterables.getOnlyElement(windows());
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return windowedValue.getPane();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.outputWindowedValue(windowedValue.withValue(output));
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      checkTimestamp(timestamp);
-      context.outputWindowedValue(output, timestamp,
-          windowedValue.getWindows(), windowedValue.getPane());
-    }
-
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      context.outputWindowedValue(output, timestamp, windows, pane);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      checkNotNull(tag, "Tag passed to sideOutput cannot be null");
-      context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
-      checkTimestamp(timestamp);
-      context.sideOutputWindowedValue(
-          tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
-    }
-
-    @Override
-    public Instant timestamp() {
-      return windowedValue.getTimestamp();
-    }
-
-    public Collection<? extends BoundedWindow> windows() {
-      return windowedValue.getWindows();
-    }
-
-    private void checkTimestamp(Instant timestamp) {
-      if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
-        throw new IllegalArgumentException(String.format(
-            "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
-            + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
-            + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
-            timestamp, windowedValue.getTimestamp(),
-            PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
-      }
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return new WindowingInternals<InputT, OutputT>() {
-        @Override
-        public void outputWindowedValue(OutputT output, Instant timestamp,
-            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-          context.outputWindowedValue(output, timestamp, windows, pane);
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return windowedValue.getWindows();
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return windowedValue.getPane();
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return context.stepContext.timerInternals();
-        }
-
-        @Override
-        public <T> void writePCollectionViewData(
-            TupleTag<?> tag,
-            Iterable<WindowedValue<T>> data,
-            Coder<T> elemCoder) throws IOException {
-          @SuppressWarnings("unchecked")
-          Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
-
-          context.stepContext.writePCollectionViewData(
-              tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
-              window(), windowCoder);
-        }
-
-        @Override
-        public StateInternals<?> stateInternals() {
-          return context.stepContext.stateInternals();
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          return context.sideInput(view, mainInputWindow);
-        }
-      };
-    }
-
-    @Override
-    protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
-        createAggregatorInternal(
-            String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ff6c9d9/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
new file mode 100644
index 0000000..1298fc8
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+
+/**
+ * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in.
+ *
+ * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+ * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
+ */
+class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+  /** The {@link OldDoFn} being run. */
+  private final OldDoFn<InputT, OutputT> fn;
+  /** The context used for running the {@link OldDoFn}. */
+  private final DoFnContext<InputT, OutputT> context;
+
+  public SimpleOldDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
+      AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) {
+    this.fn = fn;
+    this.context = new DoFnContext<>(
+        options,
+        fn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        sideOutputTags,
+        stepContext,
+        aggregatorFactory,
+        windowingStrategy == null ? null : windowingStrategy.getWindowFn());
+  }
+
+  @Override
+  public void startBundle() {
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      fn.startBundle(context);
+    } catch (Throwable t) {
+      // Exception in user code.
+      throw wrapUserCodeException(t);
+    }
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    if (elem.getWindows().size() <= 1
+        || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
+            && context.sideInputReader.isEmpty())) {
+      invokeProcessElement(elem);
+    } else {
+      // We could modify the windowed value (and the processContext) to
+      // avoid repeated allocations, but this is more straightforward.
+      for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
+        invokeProcessElement(windowedValue);
+      }
+    }
+  }
+
+  private void invokeProcessElement(WindowedValue<InputT> elem) {
+    final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      fn.processElement(processContext);
+    } catch (Exception ex) {
+      throw wrapUserCodeException(ex);
+    }
+  }
+
+  @Override
+  public void finishBundle() {
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      fn.finishBundle(context);
+    } catch (Throwable t) {
+      // Exception in user code.
+      throw wrapUserCodeException(t);
+    }
+  }
+
+  /**
+   * Returns a new {@link OldDoFn.ProcessContext} for the given element.
+   */
+  private OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
+      WindowedValue<InputT> elem) {
+    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
+  }
+
+  private RuntimeException wrapUserCodeException(Throwable t) {
+    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
+  }
+
+  private boolean isSystemDoFn() {
+    return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
+  }
+
+  /**
+   * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
+   *
+   * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+   * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
+   */
+  private static class DoFnContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.Context {
+    private static final int MAX_SIDE_OUTPUTS = 1000;
+
+    final PipelineOptions options;
+    final OldDoFn<InputT, OutputT> fn;
+    final SideInputReader sideInputReader;
+    final OutputManager outputManager;
+    final TupleTag<OutputT> mainOutputTag;
+    final StepContext stepContext;
+    final AggregatorFactory aggregatorFactory;
+    final WindowFn<?, ?> windowFn;
+
+    /**
+     * The set of known output tags, some of which may be undeclared, so we can throw an
+     * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
+     */
+    private Set<TupleTag<?>> outputTags;
+
+    public DoFnContext(PipelineOptions options,
+                       OldDoFn<InputT, OutputT> fn,
+                       SideInputReader sideInputReader,
+                       OutputManager outputManager,
+                       TupleTag<OutputT> mainOutputTag,
+                       List<TupleTag<?>> sideOutputTags,
+                       StepContext stepContext,
+                       AggregatorFactory aggregatorFactory,
+                       WindowFn<?, ?> windowFn) {
+      fn.super();
+      this.options = options;
+      this.fn = fn;
+      this.sideInputReader = sideInputReader;
+      this.outputManager = outputManager;
+      this.mainOutputTag = mainOutputTag;
+      this.outputTags = Sets.newHashSet();
+
+      outputTags.add(mainOutputTag);
+      for (TupleTag<?> sideOutputTag : sideOutputTags) {
+        outputTags.add(sideOutputTag);
+      }
+
+      this.stepContext = stepContext;
+      this.aggregatorFactory = aggregatorFactory;
+      this.windowFn = windowFn;
+      super.setupDelegateAggregators();
+    }
+
+    //////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
+        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
+      final Instant inputTimestamp = timestamp;
+
+      if (timestamp == null) {
+        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+
+      if (windows == null) {
+        try {
+          // The windowFn can never succeed at accessing the element, so its type does not
+          // matter here
+          @SuppressWarnings("unchecked")
+          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
+          windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
+            @Override
+            public Object element() {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input element when none was available");
+            }
+
+            @Override
+            public Instant timestamp() {
+              if (inputTimestamp == null) {
+                throw new UnsupportedOperationException(
+                    "WindowFn attempted to access input timestamp when none was available");
+              }
+              return inputTimestamp;
+            }
+
+            @Override
+            public W window() {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input windows when none were available");
+            }
+          });
+        } catch (Exception e) {
+          throw UserCodeException.wrap(e);
+        }
+      }
+
+      return WindowedValue.of(output, timestamp, windows, pane);
+    }
+
+    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+      if (!sideInputReader.contains(view)) {
+        throw new IllegalArgumentException("calling sideInput() with unknown view");
+      }
+      BoundedWindow sideInputWindow =
+          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+      return sideInputReader.get(view, sideInputWindow);
+    }
+
+    void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
+    }
+
+    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
+      outputManager.output(mainOutputTag, windowedElem);
+      if (stepContext != null) {
+        stepContext.noteOutput(windowedElem);
+      }
+    }
+
+    private <T> void sideOutputWindowedValue(TupleTag<T> tag,
+                                               T output,
+                                               Instant timestamp,
+                                               Collection<? extends BoundedWindow> windows,
+                                               PaneInfo pane) {
+      sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
+    }
+
+    private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
+      if (!outputTags.contains(tag)) {
+        // This tag wasn't declared nor was it seen before during this execution.
+        // Thus, this must be a new, undeclared and unconsumed output.
+        // To prevent likely user errors, enforce the limit on the number of side
+        // outputs.
+        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
+          throw new IllegalArgumentException(
+              "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
+        }
+        outputTags.add(tag);
+      }
+
+      outputManager.output(tag, windowedElem);
+      if (stepContext != null) {
+        stepContext.noteSideOutput(tag, windowedElem);
+      }
+    }
+
+    // Following implementations of output, outputWithTimestamp, and sideOutput
+    // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by
+    // ProcessContext's versions in OldDoFn.processElement.
+    @Override
+    public void output(OutputT output) {
+      outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
+      sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
+      sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null");
+      return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
+    }
+  }
+
+  /**
+   * A concrete implementation of {@link OldDoFn.ProcessContext} used for running a {@link OldDoFn}
+   * over a single element.
+   *
+   * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+   * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
+   */
+  private static class DoFnProcessContext<InputT, OutputT>
+      extends OldDoFn<InputT, OutputT>.ProcessContext {
+
+
+    final OldDoFn<InputT, OutputT> fn;
+    final DoFnContext<InputT, OutputT> context;
+    final WindowedValue<InputT> windowedValue;
+
+    public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
+                              DoFnContext<InputT, OutputT> context,
+                              WindowedValue<InputT> windowedValue) {
+      fn.super();
+      this.fn = fn;
+      this.context = context;
+      this.windowedValue = windowedValue;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public InputT element() {
+      return windowedValue.getValue();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      checkNotNull(view, "View passed to sideInput cannot be null");
+      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
+      BoundedWindow window;
+      if (!windowIter.hasNext()) {
+        if (context.windowFn instanceof GlobalWindows) {
+          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
+          // without windows
+          window = GlobalWindow.INSTANCE;
+        } else {
+          throw new IllegalStateException(
+              "sideInput called when main input element is not in any windows");
+        }
+      } else {
+        window = windowIter.next();
+        if (windowIter.hasNext()) {
+          throw new IllegalStateException(
+              "sideInput called when main input element is in multiple windows");
+        }
+      }
+      return context.sideInput(view, window);
+    }
+
+    @Override
+    public BoundedWindow window() {
+      if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
+        throw new UnsupportedOperationException(
+            "window() is only available in the context of a OldDoFn marked as"
+                + "RequiresWindowAccess.");
+      }
+      return Iterables.getOnlyElement(windows());
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return windowedValue.getPane();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.outputWindowedValue(windowedValue.withValue(output));
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      checkTimestamp(timestamp);
+      context.outputWindowedValue(output, timestamp,
+          windowedValue.getWindows(), windowedValue.getPane());
+    }
+
+    void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      context.outputWindowedValue(output, timestamp, windows, pane);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      checkNotNull(tag, "Tag passed to sideOutput cannot be null");
+      context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
+      checkTimestamp(timestamp);
+      context.sideOutputWindowedValue(
+          tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
+    }
+
+    @Override
+    public Instant timestamp() {
+      return windowedValue.getTimestamp();
+    }
+
+    public Collection<? extends BoundedWindow> windows() {
+      return windowedValue.getWindows();
+    }
+
+    private void checkTimestamp(Instant timestamp) {
+      if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+        throw new IllegalArgumentException(String.format(
+            "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+            + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
+            + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
+            timestamp, windowedValue.getTimestamp(),
+            PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
+      }
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      return new WindowingInternals<InputT, OutputT>() {
+        @Override
+        public void outputWindowedValue(OutputT output, Instant timestamp,
+            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+          context.outputWindowedValue(output, timestamp, windows, pane);
+        }
+
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          return windowedValue.getWindows();
+        }
+
+        @Override
+        public PaneInfo pane() {
+          return windowedValue.getPane();
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          return context.stepContext.timerInternals();
+        }
+
+        @Override
+        public <T> void writePCollectionViewData(
+            TupleTag<?> tag,
+            Iterable<WindowedValue<T>> data,
+            Coder<T> elemCoder) throws IOException {
+          @SuppressWarnings("unchecked")
+          Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
+
+          context.stepContext.writePCollectionViewData(
+              tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
+              window(), windowCoder);
+        }
+
+        @Override
+        public StateInternals<?> stateInternals() {
+          return context.stepContext.stateInternals();
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+          return context.sideInput(view, mainInputWindow);
+        }
+      };
+    }
+
+    @Override
+    protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
+        createAggregatorInternal(
+            String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ff6c9d9/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
deleted file mode 100644
index 773f26a..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.is;
-import static org.mockito.Mockito.mock;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link SimpleDoFnRunner} functionality.
- */
-@RunWith(JUnit4.class)
-public class SimpleDoFnRunnerTest {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testExceptionsWrappedAsUserCodeException() {
-    ThrowingDoFn fn = new ThrowingDoFn();
-    DoFnRunner<String, String> runner = createRunner(fn);
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(is(fn.exceptionToThrow));
-
-    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
-  }
-
-  @Test
-  public void testSystemDoFnInternalExceptionsNotWrapped() {
-    ThrowingSystemDoFn fn = new ThrowingSystemDoFn();
-    DoFnRunner<String, String> runner = createRunner(fn);
-
-    thrown.expect(is(fn.exceptionToThrow));
-
-    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
-  }
-
-  private DoFnRunner<String, String> createRunner(OldDoFn<String, String> fn) {
-    // Pass in only necessary parameters for the test
-    List<TupleTag<?>> sideOutputTags = Arrays.asList();
-    StepContext context = mock(StepContext.class);
-    return new SimpleDoFnRunner<>(
-          null, fn, null, null, null, sideOutputTags, context, null, null);
-  }
-
-  static class ThrowingDoFn extends OldDoFn<String, String> {
-    final Exception exceptionToThrow =
-        new UnsupportedOperationException("Expected exception");
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      throw exceptionToThrow;
-    }
-  }
-
-  @SystemDoFnInternal
-  static class ThrowingSystemDoFn extends ThrowingDoFn {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ff6c9d9/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
new file mode 100644
index 0000000..0e23dcb
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link SimpleOldDoFnRunner} functionality.
+ */
+@RunWith(JUnit4.class)
+public class SimpleOldDoFnRunnerTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testExceptionsWrappedAsUserCodeException() {
+    ThrowingDoFn fn = new ThrowingDoFn();
+    DoFnRunner<String, String> runner = createRunner(fn);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(is(fn.exceptionToThrow));
+
+    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
+  }
+
+  @Test
+  public void testSystemDoFnInternalExceptionsNotWrapped() {
+    ThrowingSystemDoFn fn = new ThrowingSystemDoFn();
+    DoFnRunner<String, String> runner = createRunner(fn);
+
+    thrown.expect(is(fn.exceptionToThrow));
+
+    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
+  }
+
+  private DoFnRunner<String, String> createRunner(OldDoFn<String, String> fn) {
+    // Pass in only necessary parameters for the test
+    List<TupleTag<?>> sideOutputTags = Arrays.asList();
+    StepContext context = mock(StepContext.class);
+    return new SimpleOldDoFnRunner<>(
+          null, fn, null, null, null, sideOutputTags, context, null, null);
+  }
+
+  static class ThrowingDoFn extends OldDoFn<String, String> {
+    final Exception exceptionToThrow =
+        new UnsupportedOperationException("Expected exception");
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      throw exceptionToThrow;
+    }
+  }
+
+  @SystemDoFnInternal
+  static class ThrowingSystemDoFn extends ThrowingDoFn {
+  }
+}