You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/19 01:12:29 UTC

[5/7] beam git commit: Creates ProcessFnRunner and wires it through ParDoEvaluator

Creates ProcessFnRunner and wires it through ParDoEvaluator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b93de58f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b93de58f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b93de58f

Branch: refs/heads/master
Commit: b93de58f5a3a10877997815a793725cb0e53cc2d
Parents: 7e1a267
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 14:52:23 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:07 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunners.java   |  32 +++++
 .../beam/runners/core/ProcessFnRunner.java      | 127 +++++++++++++++++++
 .../beam/runners/direct/ParDoEvaluator.java     | 114 +++++++++++++----
 .../runners/direct/ParDoEvaluatorFactory.java   |  11 +-
 ...littableProcessElementsEvaluatorFactory.java | 106 ++++++++++++----
 .../direct/StatefulParDoEvaluatorFactory.java   |   4 +-
 .../direct/TransformEvaluatorRegistry.java      |   4 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   3 +-
 8 files changed, 341 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index b09ee08..8501e72 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.runners.core;
 
+import java.util.Collection;
 import java.util.List;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
 import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
 import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -26,10 +28,12 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
@@ -146,4 +150,32 @@ public class DoFnRunners {
         stateCleaner,
         droppedDueToLateness);
   }
+
+  public static <InputT, OutputT, RestrictionT>
+  ProcessFnRunner<InputT, OutputT, RestrictionT>
+  newProcessFnRunner(
+      ProcessFn<InputT, OutputT, RestrictionT, ?> fn,
+      PipelineOptions options,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> additionalOutputTags,
+      StepContext stepContext,
+      AggregatorFactory aggregatorFactory,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    return new ProcessFnRunner<>(
+        simpleRunner(
+            options,
+            fn,
+            sideInputReader,
+            outputManager,
+            mainOutputTag,
+            additionalOutputTags,
+            stepContext,
+            aggregatorFactory,
+            windowingStrategy),
+        views,
+        sideInputReader);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
new file mode 100644
index 0000000..3ae3f50
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.runners.core.SplittableParDo.ProcessFn;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Instant;
+
+/** Runs a {@link ProcessFn} by constructing the appropriate contexts and passing them in. */
+public class ProcessFnRunner<InputT, OutputT, RestrictionT>
+    implements PushbackSideInputDoFnRunner<
+        KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
+  private final DoFnRunner<
+          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+      underlying;
+  private final Collection<PCollectionView<?>> views;
+  private final ReadyCheckingSideInputReader sideInputReader;
+
+  ProcessFnRunner(
+      DoFnRunner<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+          underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    this.underlying = underlying;
+    this.views = views;
+    this.sideInputReader = sideInputReader;
+  }
+
+  @Override
+  public void startBundle() {
+    underlying.startBundle();
+  }
+
+  @Override
+  public Iterable<WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>>
+      processElementInReadyWindows(
+          WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+              windowedKWI) {
+    checkTrivialOuterWindows(windowedKWI);
+    BoundedWindow window = getUnderlyingWindow(windowedKWI.getValue());
+    if (!isReady(window)) {
+      return Collections.singletonList(windowedKWI);
+    }
+    underlying.processElement(windowedKWI);
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void finishBundle() {
+    underlying.finishBundle();
+  }
+
+  @Override
+  public void onTimer(
+      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+    throw new UnsupportedOperationException("User timers unsupported in ProcessFn");
+  }
+
+  private static <T> void checkTrivialOuterWindows(
+      WindowedValue<KeyedWorkItem<String, T>> windowedKWI) {
+    // In practice it will be in 0 or 1 windows (ValueInEmptyWindows or ValueInGlobalWindow)
+    Collection<? extends BoundedWindow> outerWindows = windowedKWI.getWindows();
+    if (!outerWindows.isEmpty()) {
+      checkArgument(
+          outerWindows.size() == 1,
+          "The KeyedWorkItem itself must not be in multiple windows, but was in: %s",
+          outerWindows);
+      BoundedWindow onlyWindow = Iterables.getOnlyElement(outerWindows);
+      checkArgument(
+          onlyWindow instanceof GlobalWindow,
+          "KeyedWorkItem must be in the Global window, but was in: %s",
+          onlyWindow);
+    }
+  }
+
+  private static <T> BoundedWindow getUnderlyingWindow(KeyedWorkItem<String, T> kwi) {
+    if (Iterables.isEmpty(kwi.elementsIterable())) {
+      // ProcessFn sets only a single timer.
+      TimerData timer = Iterables.getOnlyElement(kwi.timersIterable());
+      return ((WindowNamespace) timer.getNamespace()).getWindow();
+    } else {
+      // KWI must have a single element in elementsIterable, because it follows a GBK by a
+      // uniquely generated key.
+      // Additionally, windows must be exploded before GBKIntoKeyedWorkItems, so there's also
+      // only a single window.
+      WindowedValue<T> value = Iterables.getOnlyElement(kwi.elementsIterable());
+      return Iterables.getOnlyElement(value.getWindows());
+    }
+  }
+
+  private boolean isReady(BoundedWindow mainInputWindow) {
+    for (PCollectionView<?> view : views) {
+      BoundedWindow sideInputWindow = view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
+      if (!sideInputReader.isReady(view, sideInputWindow)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index bab7b2c..cab11db 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -43,6 +44,50 @@ import org.apache.beam.sdk.values.TupleTag;
 
 class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
 
+  public interface DoFnRunnerFactory<InputT, OutputT> {
+    PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
+        PipelineOptions options,
+        DoFn<InputT, OutputT> fn,
+        List<PCollectionView<?>> sideInputs,
+        ReadyCheckingSideInputReader sideInputReader,
+        OutputManager outputManager,
+        TupleTag<OutputT> mainOutputTag,
+        List<TupleTag<?>> additionalOutputTags,
+        DirectStepContext stepContext,
+        AggregatorContainer.Mutator aggregatorChanges,
+        WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy);
+  }
+
+  public static <InputT, OutputT> DoFnRunnerFactory<InputT, OutputT> defaultRunnerFactory() {
+    return new DoFnRunnerFactory<InputT, OutputT>() {
+      @Override
+      public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
+          PipelineOptions options,
+          DoFn<InputT, OutputT> fn,
+          List<PCollectionView<?>> sideInputs,
+          ReadyCheckingSideInputReader sideInputReader,
+          OutputManager outputManager,
+          TupleTag<OutputT> mainOutputTag,
+          List<TupleTag<?>> additionalOutputTags,
+          DirectStepContext stepContext,
+          AggregatorContainer.Mutator aggregatorChanges,
+          WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
+        DoFnRunner<InputT, OutputT> underlying =
+            DoFnRunners.simpleRunner(
+                options,
+                fn,
+                sideInputReader,
+                outputManager,
+                mainOutputTag,
+                additionalOutputTags,
+                stepContext,
+                aggregatorChanges,
+                windowingStrategy);
+        return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+      }
+    };
+  }
+
   public static <InputT, OutputT> ParDoEvaluator<InputT> create(
       EvaluationContext evaluationContext,
       DirectStepContext stepContext,
@@ -53,9 +98,43 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
-      Map<TupleTag<?>, PCollection<?>> outputs) {
+      Map<TupleTag<?>, PCollection<?>> outputs,
+      DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
     AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
 
+    BundleOutputManager outputManager = createOutputManager(evaluationContext, key, outputs);
+
+    ReadyCheckingSideInputReader sideInputReader =
+        evaluationContext.createSideInputReader(sideInputs);
+
+    PushbackSideInputDoFnRunner<InputT, OutputT> runner = runnerFactory.createRunner(
+        evaluationContext.getPipelineOptions(),
+        fn,
+        sideInputs,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        additionalOutputTags,
+        stepContext,
+        aggregatorChanges,
+        windowingStrategy);
+
+    return create(runner, stepContext, application, aggregatorChanges, outputManager);
+  }
+
+  public static <InputT, OutputT> ParDoEvaluator<InputT> create(
+      PushbackSideInputDoFnRunner<InputT, OutputT> runner,
+      DirectStepContext stepContext,
+      AppliedPTransform<?, ?, ?> application,
+      AggregatorContainer.Mutator aggregatorChanges,
+      BundleOutputManager outputManager) {
+    return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
+  }
+
+  static BundleOutputManager createOutputManager(
+      EvaluationContext evaluationContext,
+      StructuralKey<?> key,
+      Map<TupleTag<?>, PCollection<?>> outputs) {
     Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
     for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
       // Just trust the context's decision as to whether the output should be keyed.
@@ -69,32 +148,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
             outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
       }
     }
-    BundleOutputManager outputManager = BundleOutputManager.create(outputBundles);
-
-    ReadyCheckingSideInputReader sideInputReader =
-        evaluationContext.createSideInputReader(sideInputs);
-
-    DoFnRunner<InputT, OutputT> underlying =
-        DoFnRunners.simpleRunner(
-            evaluationContext.getPipelineOptions(),
-            fn,
-            sideInputReader,
-            outputManager,
-            mainOutputTag,
-            additionalOutputTags,
-            stepContext,
-            aggregatorChanges,
-            windowingStrategy);
-    PushbackSideInputDoFnRunner<InputT, OutputT> runner =
-        SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
-
-    try {
-      runner.startBundle();
-    } catch (Exception e) {
-      throw UserCodeException.wrap(e);
-    }
-
-    return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
+    return BundleOutputManager.create(outputBundles);
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
@@ -119,6 +173,12 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
     this.stepContext = stepContext;
     this.aggregatorChanges = aggregatorChanges;
     this.unprocessedElements = ImmutableList.builder();
+
+    try {
+      fnRunner.startBundle();
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
   }
 
   public BundleOutputManager getOutputManager() {

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 93f204a..b00c2b6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -43,9 +43,13 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
   private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
   private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones;
   private final EvaluationContext evaluationContext;
+  private final ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory;
 
-  ParDoEvaluatorFactory(EvaluationContext evaluationContext) {
+  ParDoEvaluatorFactory(
+      EvaluationContext evaluationContext,
+      ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
     this.evaluationContext = evaluationContext;
+    this.runnerFactory = runnerFactory;
     fnClones =
         CacheBuilder.newBuilder()
             .build(
@@ -148,7 +152,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
           sideInputs,
           mainOutputTag,
           additionalOutputTags,
-          pcollections(application.getOutputs()));
+          pcollections(application.getOutputs()),
+          runnerFactory);
     } catch (Exception e) {
       try {
         fnManager.remove();
@@ -162,7 +167,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     }
   }
 
-  private Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
+  static Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
     Map<TupleTag<?>, PCollection<?>> pcs = new HashMap<>();
     for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
       pcs.put(output.getKey(), (PCollection<?>) output.getValue());

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 00b16dd..7efdb52 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -18,25 +18,34 @@
 package org.apache.beam.runners.direct;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.Executors;
+import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ElementAndRestriction;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
 import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.SplittableParDo.ProcessFn;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -51,7 +60,11 @@ class SplittableProcessElementsEvaluatorFactory<
 
   SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
-    this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+    this.delegateFactory =
+        new ParDoEvaluatorFactory<>(
+            evaluationContext,
+            SplittableProcessElementsEvaluatorFactory
+                .<InputT, OutputT, RestrictionT>processFnRunnerFactory());
   }
 
   @Override
@@ -82,12 +95,12 @@ class SplittableProcessElementsEvaluatorFactory<
     final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
         application.getTransform();
 
-    SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+    ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
         transform.newProcessFn(transform.getFn());
 
     DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
     processFn =
-        ((SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
+        ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
             fnManager
                 .<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
                     get());
@@ -98,7 +111,7 @@ class SplittableProcessElementsEvaluatorFactory<
             .getExecutionContext(application, inputBundle.getKey())
             .getOrCreateStepContext(stepName, stepName);
 
-    ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+    final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
         parDoEvaluator =
             delegateFactory.createParDoEvaluator(
                 application,
@@ -127,34 +140,36 @@ class SplittableProcessElementsEvaluatorFactory<
           }
         });
 
-    final OutputManager outputManager = parDoEvaluator.getOutputManager();
+    OutputWindowedValue<OutputT> outputWindowedValue =
+        new OutputWindowedValue<OutputT>() {
+          private final OutputManager outputManager = parDoEvaluator.getOutputManager();
+
+          @Override
+          public void outputWindowedValue(
+              OutputT output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            outputManager.output(
+                transform.getMainOutputTag(), WindowedValue.of(output, timestamp, windows, pane));
+          }
+
+          @Override
+          public <AdditionalOutputT> void outputWindowedValue(
+              TupleTag<AdditionalOutputT> tag,
+              AdditionalOutputT output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
+          }
+        };
     processFn.setProcessElementInvoker(
         new OutputAndTimeBoundedSplittableProcessElementInvoker<
             InputT, OutputT, RestrictionT, TrackerT>(
             transform.getFn(),
             evaluationContext.getPipelineOptions(),
-            new OutputWindowedValue<OutputT>() {
-              @Override
-              public void outputWindowedValue(
-                  OutputT output,
-                  Instant timestamp,
-                  Collection<? extends BoundedWindow> windows,
-                  PaneInfo pane) {
-                outputManager.output(
-                    transform.getMainOutputTag(),
-                    WindowedValue.of(output, timestamp, windows, pane));
-              }
-
-              @Override
-              public <AdditionalOutputT> void outputWindowedValue(
-                  TupleTag<AdditionalOutputT> tag,
-                  AdditionalOutputT output,
-                  Instant timestamp,
-                  Collection<? extends BoundedWindow> windows,
-                  PaneInfo pane) {
-                outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
-              }
-            },
+            outputWindowedValue,
             evaluationContext.createSideInputReader(transform.getSideInputs()),
             // TODO: For better performance, use a higher-level executor?
             Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
@@ -163,4 +178,41 @@ class SplittableProcessElementsEvaluatorFactory<
 
     return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager);
   }
+
+  private static <InputT, OutputT, RestrictionT>
+  ParDoEvaluator.DoFnRunnerFactory<
+                KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+          processFnRunnerFactory() {
+    return new ParDoEvaluator.DoFnRunnerFactory<
+            KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
+      @Override
+      public PushbackSideInputDoFnRunner<
+          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+      createRunner(
+          PipelineOptions options,
+          DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> fn,
+          List<PCollectionView<?>> sideInputs,
+          ReadyCheckingSideInputReader sideInputReader,
+          OutputManager outputManager,
+          TupleTag<OutputT> mainOutputTag,
+          List<TupleTag<?>> additionalOutputTags,
+          DirectExecutionContext.DirectStepContext stepContext,
+          AggregatorContainer.Mutator aggregatorChanges,
+          WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
+        ProcessFn<InputT, OutputT, RestrictionT, ?> processFn =
+            (ProcessFn) fn;
+        return DoFnRunners.newProcessFnRunner(
+            processFn,
+            options,
+            sideInputs,
+            sideInputReader,
+            outputManager,
+            mainOutputTag,
+            additionalOutputTags,
+            stepContext,
+            aggregatorChanges,
+            windowingStrategy);
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index be77ea1..8793ae8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -65,7 +65,9 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
   private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;
 
   StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext) {
-    this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+    this.delegateFactory =
+        new ParDoEvaluatorFactory<>(
+            evaluationContext, ParDoEvaluator.<KV<K, InputT>, OutputT>defaultRunnerFactory());
     this.cleanupRegistry =
         CacheBuilder.newBuilder()
             .weakValues()

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index ae7ad93..d06c460 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -52,7 +52,9 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
         ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
             .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt))
             .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
-            .put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt))
+            .put(
+                ParDo.MultiOutput.class,
+                new ParDoEvaluatorFactory<>(ctxt, ParDoEvaluator.defaultRunnerFactory()))
             .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
             .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
             .put(WriteView.class, new ViewEvaluatorFactory(ctxt))

http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 2be0f9d..e99e4bf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -169,7 +169,8 @@ public class ParDoEvaluatorTest {
         ImmutableList.<PCollectionView<?>>of(singletonView),
         mainOutputTag,
         additionalOutputTags,
-        ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
+        ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output),
+        ParDoEvaluator.<Integer, Integer>defaultRunnerFactory());
   }
 
   private static class RecorderFn extends DoFn<Integer, Integer> {