You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/01 23:46:22 UTC

[1/3] incubator-beam git commit: This closes #1203

Repository: incubator-beam
Updated Branches:
  refs/heads/master 7e0cfe5b9 -> facf096e5


This closes #1203


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/facf096e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/facf096e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/facf096e

Branch: refs/heads/master
Commit: facf096e5007eba1ad46b63e52104331e163d756
Parents: 7e0cfe5 726998a
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 1 16:45:07 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 1 16:45:07 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/ParDoEvaluator.java     |  18 +-
 .../runners/direct/ParDoEvaluatorFactory.java   | 126 ++++++
 .../direct/ParDoMultiEvaluatorFactory.java      | 106 -----
 .../direct/ParDoMultiEvaluatorHooks.java        |  54 +++
 .../direct/ParDoSingleEvaluatorFactory.java     | 109 -----
 .../direct/ParDoSingleEvaluatorHooks.java       |  57 +++
 .../direct/TransformEvaluatorRegistry.java      |  10 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |  10 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  | 439 -------------------
 .../direct/ParDoMultiEvaluatorHooksTest.java    | 439 +++++++++++++++++++
 .../direct/ParDoSingleEvaluatorFactoryTest.java | 335 --------------
 .../direct/ParDoSingleEvaluatorHooksTest.java   | 335 ++++++++++++++
 12 files changed, 1030 insertions(+), 1008 deletions(-)
----------------------------------------------------------------------



[3/3] incubator-beam git commit: Deduplicates ParDo{Single, Multi}EvaluatorFactory

Posted by tg...@apache.org.
Deduplicates ParDo{Single,Multi}EvaluatorFactory

This is in preparation for adding a third one for a future ParDo-like primitive
transform to be introduced inside SplittableParDo.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/726998ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/726998ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/726998ae

Branch: refs/heads/master
Commit: 726998ae68ee99eb10bf43ff8aae1a5f121728a4
Parents: 7e0cfe5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Oct 26 16:34:47 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 1 16:45:07 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/ParDoEvaluator.java     |  18 +-
 .../runners/direct/ParDoEvaluatorFactory.java   | 126 ++++++
 .../direct/ParDoMultiEvaluatorFactory.java      | 106 -----
 .../direct/ParDoMultiEvaluatorHooks.java        |  54 +++
 .../direct/ParDoSingleEvaluatorFactory.java     | 109 -----
 .../direct/ParDoSingleEvaluatorHooks.java       |  57 +++
 .../direct/TransformEvaluatorRegistry.java      |  10 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |  10 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  | 439 -------------------
 .../direct/ParDoMultiEvaluatorHooksTest.java    | 439 +++++++++++++++++++
 .../direct/ParDoSingleEvaluatorFactoryTest.java | 335 --------------
 .../direct/ParDoSingleEvaluatorHooksTest.java   | 335 ++++++++++++++
 12 files changed, 1030 insertions(+), 1008 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index ff49b60..5913379 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -39,8 +39,8 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
-class ParDoEvaluator<T> implements TransformEvaluator<T> {
-  public static <InputT, OutputT> ParDoEvaluator<InputT> create(
+class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
+  public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create(
       EvaluationContext evaluationContext,
       DirectStepContext stepContext,
       AppliedPTransform<PCollection<InputT>, ?, ?> application,
@@ -86,17 +86,17 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
 
-  private final PushbackSideInputDoFnRunner<T, ?> fnRunner;
-  private final AppliedPTransform<PCollection<T>, ?, ?> transform;
+  private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
+  private final AppliedPTransform<PCollection<InputT>, ?, ?> transform;
   private final AggregatorContainer.Mutator aggregatorChanges;
   private final Collection<UncommittedBundle<?>> outputBundles;
   private final DirectStepContext stepContext;
 
-  private final ImmutableList.Builder<WindowedValue<T>> unprocessedElements;
+  private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements;
 
   private ParDoEvaluator(
-      PushbackSideInputDoFnRunner<T, ?> fnRunner,
-      AppliedPTransform<PCollection<T>, ?, ?> transform,
+      PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
+      AppliedPTransform<PCollection<InputT>, ?, ?> transform,
       AggregatorContainer.Mutator aggregatorChanges,
       Collection<UncommittedBundle<?>> outputBundles,
       DirectStepContext stepContext) {
@@ -109,9 +109,9 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
   }
 
   @Override
-  public void processElement(WindowedValue<T> element) {
+  public void processElement(WindowedValue<InputT> element) {
     try {
-      Iterable<WindowedValue<T>> unprocessed = fnRunner.processElementInReadyWindows(element);
+      Iterable<WindowedValue<InputT>> unprocessed = fnRunner.processElementInReadyWindows(element);
       unprocessedElements.addAll(unprocessed);
     } catch (Exception e) {
       throw UserCodeException.wrap(e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
new file mode 100644
index 0000000..ee4987f
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link TransformEvaluatorFactory} for {@link ParDo}-like primitive {@link PTransform
+ * PTransforms}, parameterized by some {@link TransformHooks transform-specific handling}.
+ */
+final class ParDoEvaluatorFactory<
+        InputT,
+        OutputT,
+        TransformOutputT extends POutput,
+        TransformT extends PTransform<PCollection<? extends InputT>, TransformOutputT>>
+    implements TransformEvaluatorFactory {
+  interface TransformHooks<
+      InputT,
+      OutputT,
+      TransformOutputT extends POutput,
+      TransformT extends PTransform<PCollection<? extends InputT>, TransformOutputT>> {
+    /** Returns the {@link DoFn} contained in the given {@link ParDo} transform. */
+    DoFn<InputT, OutputT> getDoFn(TransformT transform);
+
+    /** Configures and creates a {@link ParDoEvaluator} for the given {@link DoFn}. */
+    ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
+        EvaluationContext evaluationContext,
+        AppliedPTransform<PCollection<InputT>, TransformOutputT, TransformT> application,
+        DirectStepContext stepContext,
+        DoFn<InputT, OutputT> fnLocal);
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
+  private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones;
+  private final EvaluationContext evaluationContext;
+  private final TransformHooks<InputT, OutputT, TransformOutputT, TransformT> hooks;
+
+  ParDoEvaluatorFactory(
+      EvaluationContext evaluationContext,
+      TransformHooks<InputT, OutputT, TransformOutputT, TransformT> hooks) {
+    this.evaluationContext = evaluationContext;
+    this.hooks = hooks;
+    fnClones =
+        CacheBuilder.newBuilder()
+            .build(
+                new CacheLoader<DoFn<?, ?>, DoFnLifecycleManager>() {
+                  @Override
+                  public DoFnLifecycleManager load(DoFn<?, ?> key) throws Exception {
+                    return DoFnLifecycleManager.of(key);
+                  }
+                });
+  }
+
+  @Override
+  public <T> TransformEvaluator<T> forApplication(
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    TransformEvaluator<T> evaluator =
+        (TransformEvaluator<T>)
+            createEvaluator((AppliedPTransform) application, (CommittedBundle) inputBundle);
+    return evaluator;
+  }
+
+  @Override
+  public void cleanup() throws Exception {
+    DoFnLifecycleManagers.removeAllFromManagers(fnClones.asMap().values());
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private TransformEvaluator<InputT> createEvaluator(
+      AppliedPTransform<PCollection<InputT>, TransformOutputT, TransformT> application,
+      CommittedBundle<InputT> inputBundle)
+      throws Exception {
+    String stepName = evaluationContext.getStepName(application);
+    DirectStepContext stepContext =
+        evaluationContext
+            .getExecutionContext(application, inputBundle.getKey())
+            .getOrCreateStepContext(stepName, stepName);
+
+    DoFnLifecycleManager fnManager =
+        fnClones.getUnchecked(hooks.getDoFn(application.getTransform()));
+    try {
+      return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
+          hooks.createParDoEvaluator(
+              evaluationContext, application, stepContext, (DoFn<InputT, OutputT>) fnManager.get()),
+          fnManager);
+    } catch (Exception e) {
+      try {
+        fnManager.remove();
+      } catch (Exception removalException) {
+        LOG.error(
+            "Exception encountered while cleaning up in ParDo evaluator construction",
+            removalException);
+        e.addSuppressed(removalException);
+      }
+      throw e;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
deleted file mode 100644
index ccda0e2..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import java.util.Map;
-import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link BoundMulti} primitive {@link PTransform}.
- */
-class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
-  private static final Logger LOG = LoggerFactory.getLogger(ParDoMultiEvaluatorFactory.class);
-  private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones;
-  private final EvaluationContext evaluationContext;
-
-  public ParDoMultiEvaluatorFactory(EvaluationContext evaluationContext) {
-    this.evaluationContext = evaluationContext;
-    fnClones = CacheBuilder.newBuilder()
-        .build(new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() {
-          @Override
-          public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
-              throws Exception {
-            BoundMulti<?, ?> bound = (BoundMulti<?, ?>) key.getTransform();
-            return DoFnLifecycleManager.of(bound.getNewFn());
-          }
-        });
-  }
-
-  @Override
-  public <T> TransformEvaluator<T> forApplication(
-      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    TransformEvaluator<T> evaluator =
-        createMultiEvaluator((AppliedPTransform) application, inputBundle);
-    return evaluator;
-  }
-
-  @Override
-  public void cleanup() throws Exception {
-    DoFnLifecycleManagers.removeAllFromManagers(fnClones.asMap().values());
-  }
-
-  private <InT, OuT> TransformEvaluator<InT> createMultiEvaluator(
-      AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
-      CommittedBundle<InT> inputBundle) throws Exception {
-    Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
-
-    DoFnLifecycleManager fnLocal = fnClones.getUnchecked((AppliedPTransform) application);
-    String stepName = evaluationContext.getStepName(application);
-    DirectStepContext stepContext =
-        evaluationContext.getExecutionContext(application, inputBundle.getKey())
-            .getOrCreateStepContext(stepName, stepName);
-    try {
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      TransformEvaluator<InT> parDoEvaluator =
-          ParDoEvaluator.create(
-              evaluationContext,
-              stepContext,
-              application,
-              (DoFn) fnLocal.get(),
-              application.getTransform().getSideInputs(),
-              application.getTransform().getMainOutputTag(),
-              application.getTransform().getSideOutputTags().getAll(),
-              outputs);
-      return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnLocal);
-    } catch (Exception e) {
-      try {
-        fnLocal.remove();
-      } catch (Exception removalException) {
-        LOG.error("Exception encountered while cleaning up in ParDo evaluator construction",
-            removalException);
-        e.addSuppressed(removalException);
-      }
-      throw e;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
new file mode 100644
index 0000000..a566154
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+
+/** Support for {@link ParDo.BoundMulti} in {@link ParDoEvaluatorFactory}. */
+class ParDoMultiEvaluatorHooks<InputT, OutputT>
+    implements ParDoEvaluatorFactory.TransformHooks<
+        InputT, OutputT, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> {
+  @Override
+  public DoFn<InputT, OutputT> getDoFn(ParDo.BoundMulti<InputT, OutputT> transform) {
+    return transform.getNewFn();
+  }
+
+  @Override
+  public ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
+      EvaluationContext evaluationContext,
+      AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>>
+          application,
+      DirectStepContext stepContext,
+      DoFn<InputT, OutputT> fnLocal) {
+    ParDo.BoundMulti<InputT, OutputT> transform = application.getTransform();
+    return ParDoEvaluator.create(
+        evaluationContext,
+        stepContext,
+        application,
+        fnLocal,
+        transform.getSideInputs(),
+        transform.getMainOutputTag(),
+        transform.getSideOutputTags().getAll(),
+        application.getOutput().getAll());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
deleted file mode 100644
index d2a678d..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
-import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo.Bound;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link Bound ParDo.Bound} primitive {@link PTransform}.
- */
-class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
-  private static final Logger LOG = LoggerFactory.getLogger(ParDoSingleEvaluatorFactory.class);
-  private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones;
-  private final EvaluationContext evaluationContext;
-
-  public ParDoSingleEvaluatorFactory(EvaluationContext evaluationContext) {
-    this.evaluationContext = evaluationContext;
-    fnClones =
-        CacheBuilder.newBuilder()
-            .build(
-                new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() {
-                  @Override
-                  public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
-                      throws Exception {
-                    Bound<?, ?> bound = (Bound<?, ?>) key.getTransform();
-                    return DoFnLifecycleManager.of(bound.getNewFn());
-                  }
-                });
-  }
-
-  @Override
-  public <T> TransformEvaluator<T> forApplication(
-      final AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle) throws Exception {
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    TransformEvaluator<T> evaluator =
-        createSingleEvaluator((AppliedPTransform) application, inputBundle);
-    return evaluator;
-  }
-
-  @Override
-  public void cleanup() throws Exception {
-    DoFnLifecycleManagers.removeAllFromManagers(fnClones.asMap().values());
-  }
-
-  private <InputT, OutputT> TransformEvaluator<InputT> createSingleEvaluator(
-      AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, Bound<InputT, OutputT>>
-          application,
-      CommittedBundle<InputT> inputBundle)
-      throws Exception {
-    TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
-    String stepName = evaluationContext.getStepName(application);
-    DirectStepContext stepContext =
-        evaluationContext.getExecutionContext(application, inputBundle.getKey())
-            .getOrCreateStepContext(stepName, stepName);
-
-    DoFnLifecycleManager fnLocal = fnClones.getUnchecked((AppliedPTransform) application);
-    try {
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      ParDoEvaluator<InputT> parDoEvaluator =
-          ParDoEvaluator.create(
-              evaluationContext,
-              stepContext,
-              application,
-              fnLocal.get(),
-              application.getTransform().getSideInputs(),
-              mainOutputTag,
-              Collections.<TupleTag<?>>emptyList(),
-              ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
-      return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnLocal);
-    } catch (Exception e) {
-      try {
-        fnLocal.remove();
-      } catch (Exception removalException) {
-        LOG.error("Exception encountered constructing ParDo evaluator", removalException);
-        e.addSuppressed(removalException);
-      }
-      throw e;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
new file mode 100644
index 0000000..b554f41
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+/** Support for {@link ParDo.Bound} in {@link ParDoEvaluatorFactory}. */
+class ParDoSingleEvaluatorHooks<InputT, OutputT>
+    implements ParDoEvaluatorFactory.TransformHooks<
+        InputT, OutputT, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> {
+  @Override
+  public DoFn<InputT, OutputT> getDoFn(ParDo.Bound<InputT, OutputT> transform) {
+    return transform.getNewFn();
+  }
+
+  @Override
+  public ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
+      EvaluationContext evaluationContext,
+      AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>>
+          application,
+      DirectStepContext stepContext,
+      DoFn<InputT, OutputT> fnLocal) {
+    TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
+    ParDo.Bound<InputT, OutputT> transform = application.getTransform();
+    return ParDoEvaluator.create(
+        evaluationContext,
+        stepContext,
+        application,
+        fnLocal,
+        transform.getSideInputs(),
+        mainOutputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 3dd44a7..f384a14 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -45,13 +45,17 @@ import org.slf4j.LoggerFactory;
 class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
   private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class);
   public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) {
-    @SuppressWarnings("rawtypes")
+    @SuppressWarnings({"rawtypes"})
     ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives =
         ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
             .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt))
             .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
-            .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory(ctxt))
-            .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory(ctxt))
+            .put(
+                ParDo.Bound.class,
+                new ParDoEvaluatorFactory<>(ctxt, new ParDoSingleEvaluatorHooks<>()))
+            .put(
+                ParDo.BoundMulti.class,
+                new ParDoEvaluatorFactory<>(ctxt, new ParDoMultiEvaluatorHooks<>()))
             .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt))
             .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
             .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 89f9bfb..8254413 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -31,7 +31,6 @@ import java.util.Collection;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -93,13 +92,11 @@ public class ParDoEvaluatorTest {
     RecorderFn fn = new RecorderFn(singletonView);
     PCollection<Integer> output = inputPc.apply(ParDo.of(fn).withSideInputs(singletonView));
 
-    CommittedBundle<Integer> inputBundle =
-        bundleFactory.createBundle(inputPc).commit(Instant.now());
     UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(output);
     when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
 
-    ParDoEvaluator<Integer> evaluator =
-        createEvaluator(singletonView, fn, inputBundle, output);
+    ParDoEvaluator<Integer, Integer> evaluator =
+        createEvaluator(singletonView, fn, output);
 
     IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L));
     WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(3);
@@ -130,10 +127,9 @@ public class ParDoEvaluatorTest {
             WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L))));
   }
 
-  private ParDoEvaluator<Integer> createEvaluator(
+  private ParDoEvaluator<Integer, Integer> createEvaluator(
       PCollectionView<Integer> singletonView,
       RecorderFn fn,
-      DirectRunner.CommittedBundle<Integer> inputBundle,
       PCollection<Integer> output) {
     when(
             evaluationContext.createSideInputReader(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
deleted file mode 100644
index cc83323..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.Serializable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ParDoMultiEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class ParDoMultiEvaluatorFactoryTest implements Serializable {
-  private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
-
-  @Test
-  public void testParDoMultiInMemoryTransformEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-    final TupleTag<Integer> lengthTag = new TupleTag<>();
-
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(KV.<String, Integer>of(c.element(), c.element().length()));
-                    c.sideOutput(elementTag, c.element());
-                    c.sideOutput(lengthTag, c.element().length());
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-    PCollection<Integer> lengthOutput = outputTuple.get(lengthTag);
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-    UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createBundle(lengthOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-    when(evaluationContext.createBundle(lengthOutput)).thenReturn(lengthOutputBundle);
-
-    DirectExecutionContext executionContext =
-        new DirectExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory(evaluationContext)
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>containsInAnyOrder(
-            lengthOutputBundle, mainOutputBundle, elementOutputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getAggregatorChanges(), equalTo(mutator));
-
-    assertThat(
-        mainOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
-            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(
-                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(
-        elementOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<String>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow("foo"),
-            WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
-            WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(
-        lengthOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<Integer>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(3),
-            WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-  }
-
-  @Test
-  public void testParDoMultiUndeclaredSideOutput() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-    final TupleTag<Integer> lengthTag = new TupleTag<>();
-
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(KV.<String, Integer>of(c.element(), c.element().length()));
-                    c.sideOutput(elementTag, c.element());
-                    c.sideOutput(lengthTag, c.element().length());
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-
-    DirectExecutionContext executionContext =
-        new DirectExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory(evaluationContext)
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getAggregatorChanges(), equalTo(mutator));
-
-    assertThat(
-        mainOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
-            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(
-                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(
-        elementOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<String>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow("foo"),
-            WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
-            WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-  }
-
-  /**
-   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
-   * This should be ported to state when ready.
-   */
-  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
-  @Test
-  public void finishBundleWithStatePutsStateInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-
-    final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
-        StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow());
-    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
-    final StateNamespace windowNs =
-        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  private static final String STATE_ID = "my-state-id";
-
-                  @StateId(STATE_ID)
-                  private final StateSpec<Object, BagState<String>> bagSpec =
-                      StateSpecs.bag(StringUtf8Coder.of());
-
-                  @ProcessElement
-                  public void processElement(
-                      ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
-                    bagState.add(c.element());
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-
-    DirectExecutionContext executionContext = new DirectExecutionContext(null,
-        StructuralKey.of("myKey", StringUtf8Coder.of()),
-        null,
-        null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory(evaluationContext)
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L)));
-    assertThat(result.getState(), not(nullValue()));
-    assertThat(
-        result.getState().state(StateNamespaces.global(), watermarkTag).read(),
-        equalTo(new Instant(20205L)));
-    assertThat(
-        result.getState().state(windowNs, bagTag).read(),
-        containsInAnyOrder("foo", "bara", "bazam"));
-  }
-
-  /**
-   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
-   * This should be ported to state when ready.
-   */
-  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
-  @Test
-  public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-
-    final TimerData addedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(
-                    new Instant(0).plus(Duration.standardMinutes(5)),
-                    new Instant(1)
-                        .plus(Duration.standardMinutes(5))
-                        .plus(Duration.standardHours(1)))),
-            new Instant(54541L),
-            TimeDomain.EVENT_TIME);
-    final TimerData deletedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
-            new Instant(3400000),
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  private static final String EVENT_TIME_TIMER = "event-time-timer";
-                  private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
-
-                  @TimerId(EVENT_TIME_TIMER)
-                  TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
-                  @TimerId(SYNC_PROC_TIME_TIMER)
-                  TimerSpec syncProcTimerSpec =
-                      TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-                  @ProcessElement
-                  public void processElement(
-                      ProcessContext c,
-                      @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
-                      @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
-
-                    eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
-                    syncProcTimeTimer.cancel();
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
-
-    DirectExecutionContext executionContext = new DirectExecutionContext(null,
-        StructuralKey.of("myKey", StringUtf8Coder.of()),
-        null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory(evaluationContext)
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getTimerUpdate(),
-        equalTo(
-            TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
-                .setTimer(addedTimer)
-                .setTimer(addedTimer)
-                .setTimer(addedTimer)
-                .deletedTimer(deletedTimer)
-                .deletedTimer(deletedTimer)
-                .deletedTimer(deletedTimer)
-                .build()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java
new file mode 100644
index 0000000..6302d37
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ParDoMultiEvaluatorHooks}.
+ */
+@RunWith(JUnit4.class)
+public class ParDoMultiEvaluatorHooksTest implements Serializable {
+  private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+
+  @Test
+  public void testParDoMultiInMemoryTransformEvaluator() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+    final TupleTag<Integer> lengthTag = new TupleTag<>();
+
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    c.output(KV.<String, Integer>of(c.element(), c.element().length()));
+                    c.sideOutput(elementTag, c.element());
+                    c.sideOutput(lengthTag, c.element().length());
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createBundle(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+    PCollection<Integer> lengthOutput = outputTuple.get(lengthTag);
+
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
+    UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createBundle(lengthOutput);
+
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
+    when(evaluationContext.createBundle(lengthOutput)).thenReturn(lengthOutputBundle);
+
+    DirectExecutionContext executionContext =
+        new DirectExecutionContext(null, null, null, null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    TransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(),
+        Matchers.<UncommittedBundle<?>>containsInAnyOrder(
+            lengthOutputBundle, mainOutputBundle, elementOutputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(result.getAggregatorChanges(), equalTo(mutator));
+
+    assertThat(
+        mainOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
+            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
+            WindowedValue.valueInGlobalWindow(
+                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(
+        elementOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<String>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow("foo"),
+            WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
+            WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(
+        lengthOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<Integer>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(3),
+            WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
+            WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
+
+  @Test
+  public void testParDoMultiUndeclaredSideOutput() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+    final TupleTag<Integer> lengthTag = new TupleTag<>();
+
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    c.output(KV.<String, Integer>of(c.element(), c.element().length()));
+                    c.sideOutput(elementTag, c.element());
+                    c.sideOutput(lengthTag, c.element().length());
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createBundle(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
+
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
+
+    DirectExecutionContext executionContext =
+        new DirectExecutionContext(null, null, null, null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    TransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(),
+        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(result.getAggregatorChanges(), equalTo(mutator));
+
+    assertThat(
+        mainOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
+            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
+            WindowedValue.valueInGlobalWindow(
+                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(
+        elementOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<String>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow("foo"),
+            WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
+            WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
+
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
+  @Test
+  public void finishBundleWithStatePutsStateInResult() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+
+    final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
+        StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow());
+    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
+    final StateNamespace windowNs =
+        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  private static final String STATE_ID = "my-state-id";
+
+                  @StateId(STATE_ID)
+                  private final StateSpec<Object, BagState<String>> bagSpec =
+                      StateSpecs.bag(StringUtf8Coder.of());
+
+                  @ProcessElement
+                  public void processElement(
+                      ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
+                    bagState.add(c.element());
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createBundle(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
+
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
+
+    DirectExecutionContext executionContext = new DirectExecutionContext(null,
+        StructuralKey.of("myKey", StringUtf8Coder.of()),
+        null,
+        null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    TransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(),
+        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L)));
+    assertThat(result.getState(), not(nullValue()));
+    assertThat(
+        result.getState().state(StateNamespaces.global(), watermarkTag).read(),
+        equalTo(new Instant(20205L)));
+    assertThat(
+        result.getState().state(windowNs, bagTag).read(),
+        containsInAnyOrder("foo", "bara", "bazam"));
+  }
+
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
+  @Test
+  public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+
+    final TimerData addedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(
+                    new Instant(0).plus(Duration.standardMinutes(5)),
+                    new Instant(1)
+                        .plus(Duration.standardMinutes(5))
+                        .plus(Duration.standardHours(1)))),
+            new Instant(54541L),
+            TimeDomain.EVENT_TIME);
+    final TimerData deletedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
+            new Instant(3400000),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  private static final String EVENT_TIME_TIMER = "event-time-timer";
+                  private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
+
+                  @TimerId(EVENT_TIME_TIMER)
+                  TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+                  @TimerId(SYNC_PROC_TIME_TIMER)
+                  TimerSpec syncProcTimerSpec =
+                      TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+                  @ProcessElement
+                  public void processElement(
+                      ProcessContext c,
+                      @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
+                      @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
+
+                    eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
+                    syncProcTimeTimer.cancel();
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createBundle(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
+
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
+
+    DirectExecutionContext executionContext = new DirectExecutionContext(null,
+        StructuralKey.of("myKey", StringUtf8Coder.of()),
+        null, null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>())
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    TransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getTimerUpdate(),
+        equalTo(
+            TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
+                .setTimer(addedTimer)
+                .setTimer(addedTimer)
+                .setTimer(addedTimer)
+                .deletedTimer(deletedTimer)
+                .deletedTimer(deletedTimer)
+                .deletedTimer(deletedTimer)
+                .build()));
+  }
+}


[2/3] incubator-beam git commit: Deduplicates ParDo{Single, Multi}EvaluatorFactory

Posted by tg...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
deleted file mode 100644
index d22643a..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.Serializable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ParDoSingleEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class ParDoSingleEvaluatorFactoryTest implements Serializable {
-  private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
-
-  @Test
-  public void testParDoInMemoryTransformEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-    PCollection<Integer> collection =
-        input.apply(
-            ParDo.of(
-                new DoFn<String, Integer>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(c.element().length());
-                  }
-                }));
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection);
-    when(evaluationContext.createBundle(collection)).thenReturn(outputBundle);
-    DirectExecutionContext executionContext =
-        new DirectExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory(evaluationContext)
-            .forApplication(
-                collection.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getAggregatorChanges(), equalTo(mutator));
-
-    assertThat(
-        outputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<Integer>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(3),
-            WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-  }
-
-  @Test
-  public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-    final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
-    PCollection<Integer> collection =
-        input.apply(
-            ParDo.of(
-                new DoFn<String, Integer>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.sideOutput(sideOutputTag, c.element().length());
-                  }
-                }));
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection);
-    when(evaluationContext.createBundle(collection)).thenReturn(outputBundle);
-    DirectExecutionContext executionContext =
-        new DirectExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
-        inputBundle.getKey())).thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory(evaluationContext)
-            .forApplication(
-                collection.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getAggregatorChanges(), equalTo(mutator));
-  }
-
-  /**
-   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
-   * This should be ported to state when ready.
-   */
-  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
-  @Test
-  public void finishBundleWithStatePutsStateInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
-    final StateNamespace windowNs =
-        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
-    ParDo.Bound<String, KV<String, Integer>> pardo =
-        ParDo.of(
-            new DoFn<String, KV<String, Integer>>() {
-              private static final String STATE_ID = "my-state-id";
-
-              @StateId(STATE_ID)
-              private final StateSpec<Object, BagState<String>> bagSpec =
-                  StateSpecs.bag(StringUtf8Coder.of());
-
-              @ProcessElement
-              public void processElement(
-                  ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
-                bagState.add(c.element());
-              }
-            });
-    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-
-    DirectExecutionContext executionContext = new DirectExecutionContext(null,
-        StructuralKey.of("myKey", StringUtf8Coder.of()),
-        null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey()))
-        .thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory(evaluationContext)
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
-    assertThat(result.getState(), not(nullValue()));
-    assertThat(
-        result.getState().state(windowNs, bagTag).read(),
-        containsInAnyOrder("foo", "bara", "bazam"));
-  }
-
-  /**
-   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
-   * This should be ported to state when ready.
-   */
-  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
-  @Test
-  public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    // TODO: this timer data is absolute, but the new API only support relative settings.
-    // It will require adjustments when @Ignore is removed
-    final TimerData addedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(
-                    new Instant(0).plus(Duration.standardMinutes(5)),
-                    new Instant(1)
-                        .plus(Duration.standardMinutes(5))
-                        .plus(Duration.standardHours(1)))),
-            new Instant(54541L),
-            TimeDomain.EVENT_TIME);
-    final TimerData deletedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
-            new Instant(3400000),
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-    ParDo.Bound<String, KV<String, Integer>> pardo =
-        ParDo.of(
-            new DoFn<String, KV<String, Integer>>() {
-              private static final String EVENT_TIME_TIMER = "event-time-timer";
-              private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
-
-              @TimerId(EVENT_TIME_TIMER)
-              TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
-              @TimerId(SYNC_PROC_TIME_TIMER)
-              TimerSpec syncProcTimerSpec =
-                  TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-              @ProcessElement
-              public void processElement(
-                  ProcessContext c,
-                  @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
-                  @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
-                eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
-                syncProcTimeTimer.cancel();
-              }
-            });
-    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
-
-    StructuralKey<?> key = StructuralKey.of("myKey", StringUtf8Coder.of());
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(input).commit(Instant.now());
-
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createBundle(mainOutput);
-
-    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
-
-    DirectExecutionContext executionContext = new DirectExecutionContext(null,
-        key,
-        null,
-        null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
-        inputBundle.getKey()))
-        .thenReturn(executionContext);
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
-    TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory(evaluationContext)
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-
-    TransformResult result = evaluator.finishBundle();
-    assertThat(result.getTimerUpdate(),
-        equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
-            .setTimer(addedTimer)
-            .deletedTimer(deletedTimer)
-            .build()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java
new file mode 100644
index 0000000..10cd7c5
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ParDoSingleEvaluatorHooks}.
+ */
+@RunWith(JUnit4.class)
+public class ParDoSingleEvaluatorHooksTest implements Serializable {
+  private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+
+  @Test
+  public void testParDoInMemoryTransformEvaluator() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+    PCollection<Integer> collection =
+        input.apply(
+            ParDo.of(
+                new DoFn<String, Integer>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    c.output(c.element().length());
+                  }
+                }));
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createBundle(input).commit(Instant.now());
+
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
+    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection);
+    when(evaluationContext.createBundle(collection)).thenReturn(outputBundle);
+    DirectExecutionContext executionContext =
+        new DirectExecutionContext(null, null, null, null);
+    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
+
+    org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
+        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
+            .forApplication(
+                collection.getProducingTransformInternal(), inputBundle);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    TransformResult result = evaluator.finishBundle();
+    assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(result.getAggregatorChanges(), equalTo(mutator));
+
+    assertThat(
+        outputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<Integer>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(3),
+            WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
+            WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
+
+  @Test
+  public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+    final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
+    PCollection<Integer> collection =
+        input.apply(
+            ParDo.of(
+                new DoFn<String, Integer>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    c.sideOutput(sideOutputTag, c.element().length());
+                  }
+                }));
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createBundle(input).commit(Instant.now());
+
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
+    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection);
+    when(evaluationContext.createBundle(collection)).thenReturn(outputBundle);
+    DirectExecutionContext executionContext =
+        new DirectExecutionContext(null, null, null, null);
+    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
+            .forApplication(
+                collection.getProducingTransformInternal(), inputBundle);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    TransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(result.getAggregatorChanges(), equalTo(mutator));
+  }
+
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
+  @Test
+  public void finishBundleWithStatePutsStateInResult() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
+    final StateNamespace windowNs =
+        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
+    ParDo.Bound<String, KV<String, Integer>> pardo =
+        ParDo.of(
+            new DoFn<String, KV<String, Integer>>() {
+              private static final String STATE_ID = "my-state-id";
+
+              @StateId(STATE_ID)
+              private final StateSpec<Object, BagState<String>> bagSpec =
+                  StateSpecs.bag(StringUtf8Coder.of());
+
+              @ProcessElement
+              public void processElement(
+                  ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) {
+                bagState.add(c.element());
+              }
+            });
+    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createBundle(input).commit(Instant.now());
+
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createBundle(mainOutput);
+
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
+
+    DirectExecutionContext executionContext = new DirectExecutionContext(null,
+        StructuralKey.of("myKey", StringUtf8Coder.of()),
+        null, null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey()))
+        .thenReturn(executionContext);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
+
+    org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
+        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    TransformResult result = evaluator.finishBundle();
+    assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
+    assertThat(result.getState(), not(nullValue()));
+    assertThat(
+        result.getState().state(windowNs, bagTag).read(),
+        containsInAnyOrder("foo", "bara", "bazam"));
+  }
+
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
+  @Test
+  public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    // TODO: this timer data is absolute, but the new API only support relative settings.
+    // It will require adjustments when @Ignore is removed
+    final TimerData addedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(
+                    new Instant(0).plus(Duration.standardMinutes(5)),
+                    new Instant(1)
+                        .plus(Duration.standardMinutes(5))
+                        .plus(Duration.standardHours(1)))),
+            new Instant(54541L),
+            TimeDomain.EVENT_TIME);
+    final TimerData deletedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
+            new Instant(3400000),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+    ParDo.Bound<String, KV<String, Integer>> pardo =
+        ParDo.of(
+            new DoFn<String, KV<String, Integer>>() {
+              private static final String EVENT_TIME_TIMER = "event-time-timer";
+              private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer";
+
+              @TimerId(EVENT_TIME_TIMER)
+              TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+              @TimerId(SYNC_PROC_TIME_TIMER)
+              TimerSpec syncProcTimerSpec =
+                  TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+              @ProcessElement
+              public void processElement(
+                  ProcessContext c,
+                  @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer,
+                  @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) {
+                eventTimeTimer.setForNowPlus(Duration.standardMinutes(5));
+                syncProcTimeTimer.cancel();
+              }
+            });
+    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
+
+    StructuralKey<?> key = StructuralKey.of("myKey", StringUtf8Coder.of());
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createBundle(input).commit(Instant.now());
+
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createBundle(mainOutput);
+
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
+
+    DirectExecutionContext executionContext = new DirectExecutionContext(null,
+        key,
+        null,
+        null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey()))
+        .thenReturn(executionContext);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>())
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+
+    TransformResult result = evaluator.finishBundle();
+    assertThat(result.getTimerUpdate(),
+        equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
+            .setTimer(addedTimer)
+            .deletedTimer(deletedTimer)
+            .build()));
+  }
+}