You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:41 UTC

[23/50] incubator-beam git commit: Port direct runner to use new DoFn directly

Port direct runner to use new DoFn directly


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1919d8b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1919d8b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1919d8b3

Branch: refs/heads/apex-runner
Commit: 1919d8b3a850bd146137652546da851ee461cd28
Parents: f0c8d30
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 20:55:00 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/DoFnLifecycleManager.java    | 42 +++++++++--------
 .../beam/runners/direct/ParDoEvaluator.java     |  3 +-
 .../direct/ParDoMultiEvaluatorFactory.java      |  6 +--
 .../direct/ParDoSingleEvaluatorFactory.java     |  5 +-
 ...leManagerRemovingTransformEvaluatorTest.java | 16 +++----
 .../direct/DoFnLifecycleManagerTest.java        | 12 ++---
 .../direct/DoFnLifecycleManagersTest.java       | 48 ++++++++++++++++----
 .../direct/ParDoMultiEvaluatorFactoryTest.java  | 11 +++++
 .../direct/ParDoSingleEvaluatorFactoryTest.java | 11 +++++
 .../beam/runners/direct/SplittableDoFnTest.java |  8 +++-
 .../org/apache/beam/sdk/transforms/OldDoFn.java | 23 ++++++++--
 11 files changed, 130 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
index 0e15c18..23460b6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
@@ -26,7 +26,9 @@ import java.util.Collection;
 import java.util.Iterator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn.Setup;
+import org.apache.beam.sdk.transforms.DoFn.Teardown;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,49 +37,49 @@ import org.slf4j.LoggerFactory;
  * Manages {@link DoFn} setup, teardown, and serialization.
  *
  * <p>{@link DoFnLifecycleManager} is similar to a {@link ThreadLocal} storing a {@link DoFn}, but
- * calls the {@link DoFn} {@link Setup} the first time the {@link DoFn} is obtained and {@link
- * Teardown} whenever the {@link DoFn} is removed, and provides a method for clearing all cached
- * {@link DoFn DoFns}.
+ * calls the {@link DoFn} {@link Setup @Setup} method the first time the {@link DoFn} is obtained
+ * and {@link Teardown @Teardown} whenever the {@link DoFn} is removed, and provides a method for
+ * clearing all cached {@link DoFn DoFns}.
  */
 class DoFnLifecycleManager {
   private static final Logger LOG = LoggerFactory.getLogger(DoFnLifecycleManager.class);
 
-  public static DoFnLifecycleManager of(OldDoFn<?, ?> original) {
+  public static DoFnLifecycleManager of(DoFn<?, ?> original) {
     return new DoFnLifecycleManager(original);
   }
 
-  private final LoadingCache<Thread, OldDoFn<?, ?>> outstanding;
+  private final LoadingCache<Thread, DoFn<?, ?>> outstanding;
 
-  private DoFnLifecycleManager(OldDoFn<?, ?> original) {
+  private DoFnLifecycleManager(DoFn<?, ?> original) {
     this.outstanding = CacheBuilder.newBuilder().build(new DeserializingCacheLoader(original));
   }
 
-  public OldDoFn<?, ?> get() throws Exception {
+  public DoFn<?, ?> get() throws Exception {
     Thread currentThread = Thread.currentThread();
     return outstanding.get(currentThread);
   }
 
   public void remove() throws Exception {
     Thread currentThread = Thread.currentThread();
-    OldDoFn<?, ?> fn = outstanding.asMap().remove(currentThread);
-    fn.teardown();
+    DoFn<?, ?> fn = outstanding.asMap().remove(currentThread);
+    DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown();
   }
 
   /**
    * Remove all {@link DoFn DoFns} from this {@link DoFnLifecycleManager}. Returns all exceptions
    * that were thrown while calling the remove methods.
    *
-   * <p>If the returned Collection is nonempty, an exception was thrown from at least one
-   * {@link DoFn#teardown()} method, and the {@link PipelineRunner} should throw an exception.
+   * <p>If the returned Collection is nonempty, an exception was thrown from at least one {@link
+   * DoFn.Teardown @Teardown} method, and the {@link PipelineRunner} should throw an exception.
    */
   public Collection<Exception> removeAll() throws Exception {
-    Iterator<OldDoFn<?, ?>> fns = outstanding.asMap().values().iterator();
+    Iterator<DoFn<?, ?>> fns = outstanding.asMap().values().iterator();
     Collection<Exception> thrown = new ArrayList<>();
     while (fns.hasNext()) {
-      OldDoFn<?, ?> fn = fns.next();
+      DoFn<?, ?> fn = fns.next();
       fns.remove();
       try {
-        fn.teardown();
+        DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown();
       } catch (Exception e) {
         thrown.add(e);
       }
@@ -85,18 +87,18 @@ class DoFnLifecycleManager {
     return thrown;
   }
 
-  private class DeserializingCacheLoader extends CacheLoader<Thread, OldDoFn<?, ?>> {
+  private class DeserializingCacheLoader extends CacheLoader<Thread, DoFn<?, ?>> {
     private final byte[] original;
 
-    public DeserializingCacheLoader(OldDoFn<?, ?> original) {
+    public DeserializingCacheLoader(DoFn<?, ?> original) {
       this.original = SerializableUtils.serializeToByteArray(original);
     }
 
     @Override
-    public OldDoFn<?, ?> load(Thread key) throws Exception {
-      OldDoFn<?, ?> fn = (OldDoFn<?, ?>) SerializableUtils.deserializeFromByteArray(original,
+    public DoFn<?, ?> load(Thread key) throws Exception {
+      DoFn<?, ?> fn = (DoFn<?, ?>) SerializableUtils.deserializeFromByteArray(original,
           "DoFn Copy in thread " + key.getName());
-      fn.setup();
+      DoFnInvokers.INSTANCE.invokerFor(fn).invokeSetup();
       return fn;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index a59fb4d..b524dfa 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -45,7 +46,7 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
       DirectStepContext stepContext,
       CommittedBundle<InputT> inputBundle,
       AppliedPTransform<PCollection<InputT>, ?, ?> application,
-      Object fn,
+      Serializable fn, // may be OldDoFn or DoFn
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index d909e8b..02469ff 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.values.PCollection;
@@ -50,7 +50,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
           public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
               throws Exception {
             BoundMulti<?, ?> bound = (BoundMulti<?, ?>) key.getTransform();
-            return DoFnLifecycleManager.of(bound.getFn());
+            return DoFnLifecycleManager.of(bound.getNewFn());
           }
         });
   }
@@ -87,7 +87,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
               stepContext,
               inputBundle,
               application,
-              (OldDoFn) fnLocal.get(),
+              (DoFn) fnLocal.get(),
               application.getTransform().getSideInputs(),
               application.getTransform().getMainOutputTag(),
               application.getTransform().getSideOutputTags().getAll(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 1a06ea6..0584e41 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -25,7 +25,6 @@ import java.util.Collections;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.values.PCollection;
@@ -52,7 +51,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
                   public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key)
                       throws Exception {
                     Bound<?, ?> bound = (Bound<?, ?>) key.getTransform();
-                    return DoFnLifecycleManager.of(bound.getFn());
+                    return DoFnLifecycleManager.of(bound.getNewFn());
                   }
                 });
   }
@@ -92,7 +91,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
               stepContext,
               inputBundle,
               application,
-              (OldDoFn) fnLocal.get(),
+              fnLocal.get(),
               application.getTransform().getSideInputs(),
               mainOutputTag,
               Collections.<TupleTag<?>>emptyList(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index 2e4fee2..9e2732e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -27,7 +27,7 @@ import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.hamcrest.Matchers;
 import org.junit.Before;
@@ -50,7 +50,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
   @Test
   public void delegatesToUnderlying() throws Exception {
     RecordingTransformEvaluator underlying = new RecordingTransformEvaluator();
-    OldDoFn<?, ?> original = lifecycleManager.get();
+    DoFn<?, ?> original = lifecycleManager.get();
     TransformEvaluator<Object> evaluator =
         DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
     WindowedValue<Object> first = WindowedValue.valueInGlobalWindow(new Object());
@@ -67,7 +67,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
   @Test
   public void removesOnExceptionInProcessElement() throws Exception {
     ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator();
-    OldDoFn<?, ?> original = lifecycleManager.get();
+    DoFn<?, ?> original = lifecycleManager.get();
     assertThat(original, not(nullValue()));
     TransformEvaluator<Object> evaluator =
         DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
@@ -75,7 +75,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
     try {
       evaluator.processElement(WindowedValue.valueInGlobalWindow(new Object()));
     } catch (Exception e) {
-      assertThat(lifecycleManager.get(), not(Matchers.<OldDoFn<?, ?>>theInstance(original)));
+      assertThat(lifecycleManager.get(), not(Matchers.<DoFn<?, ?>>theInstance(original)));
       return;
     }
     fail("Expected ThrowingTransformEvaluator to throw on method call");
@@ -84,7 +84,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
   @Test
   public void removesOnExceptionInFinishBundle() throws Exception {
     ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator();
-    OldDoFn<?, ?> original = lifecycleManager.get();
+    DoFn<?, ?> original = lifecycleManager.get();
     // the LifecycleManager is set when the evaluator starts
     assertThat(original, not(nullValue()));
     TransformEvaluator<Object> evaluator =
@@ -94,7 +94,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
       evaluator.finishBundle();
     } catch (Exception e) {
       assertThat(lifecycleManager.get(),
-          Matchers.not(Matchers.<OldDoFn<?, ?>>theInstance(original)));
+          Matchers.not(Matchers.<DoFn<?, ?>>theInstance(original)));
       return;
     }
     fail("Expected ThrowingTransformEvaluator to throw on method call");
@@ -134,8 +134,8 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
   }
 
 
-  private static class TestFn extends OldDoFn<Object, Object> {
-    @Override
+  private static class TestFn extends DoFn<Object, Object> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
index 1f0af99..aef9d29 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -101,7 +101,7 @@ public class DoFnLifecycleManagerTest {
     assertThat(obtained.setupCalled, is(true));
     assertThat(obtained.teardownCalled, is(true));
 
-    assertThat(mgr.get(), not(Matchers.<OldDoFn<?, ?>>theInstance(obtained)));
+    assertThat(mgr.get(), not(Matchers.<DoFn<?, ?>>theInstance(obtained)));
   }
 
   @Test
@@ -142,11 +142,11 @@ public class DoFnLifecycleManagerTest {
   }
 
 
-  private static class TestFn extends OldDoFn<Object, Object> {
+  private static class TestFn extends DoFn<Object, Object> {
     boolean setupCalled = false;
     boolean teardownCalled = false;
 
-    @Override
+    @Setup
     public void setup() {
       checkState(!setupCalled);
       checkState(!teardownCalled);
@@ -154,11 +154,11 @@ public class DoFnLifecycleManagerTest {
       setupCalled = true;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
     }
 
-    @Override
+    @Teardown
     public void teardown() {
       checkState(setupCalled);
       checkState(!teardownCalled);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
index 39a4a9d..a19ff99 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java
@@ -18,12 +18,15 @@
 
 package org.apache.beam.runners.direct;
 
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collection;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
@@ -51,9 +54,15 @@ public class DoFnLifecycleManagersTest {
     third.get();
 
     final Collection<Matcher<? super Throwable>> suppressions = new ArrayList<>();
-    suppressions.add(new ThrowableMessageMatcher("foo"));
-    suppressions.add(new ThrowableMessageMatcher("bar"));
-    suppressions.add(new ThrowableMessageMatcher("baz"));
+    suppressions.add(allOf(
+        instanceOf(UserCodeException.class),
+        new CausedByMatcher(new ThrowableMessageMatcher("foo"))));
+    suppressions.add(allOf(
+        instanceOf(UserCodeException.class),
+        new CausedByMatcher(new ThrowableMessageMatcher("bar"))));
+    suppressions.add(allOf(
+        instanceOf(UserCodeException.class),
+        new CausedByMatcher(new ThrowableMessageMatcher("baz"))));
 
     thrown.expect(
         new BaseMatcher<Exception>() {
@@ -90,18 +99,18 @@ public class DoFnLifecycleManagersTest {
     DoFnLifecycleManagers.removeAllFromManagers(ImmutableList.of(first, second, third));
   }
 
-  private static class ThrowsInCleanupFn extends OldDoFn<Object, Object> {
+  private static class ThrowsInCleanupFn extends DoFn<Object, Object> {
     private final String message;
 
     private ThrowsInCleanupFn(String message) {
       this.message = message;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
     }
 
-    @Override
+    @Teardown
     public void teardown() throws Exception {
       throw new Exception(message);
     }
@@ -130,9 +139,32 @@ public class DoFnLifecycleManagersTest {
     }
   }
 
+  private static class CausedByMatcher extends BaseMatcher<Throwable> {
+    private final Matcher<Throwable> causeMatcher;
+
+    public CausedByMatcher(
+        Matcher<Throwable> causeMatcher) {
+      this.causeMatcher = causeMatcher;
+    }
 
-  private static class EmptyFn extends OldDoFn<Object, Object> {
     @Override
+    public boolean matches(Object item) {
+      if (!(item instanceof UserCodeException)) {
+        return false;
+      }
+      UserCodeException that = (UserCodeException) item;
+      return causeMatcher.matches(that.getCause());
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("a throwable with a cause ").appendDescriptionOf(causeMatcher);
+    }
+  }
+
+
+  private static class EmptyFn extends DoFn<Object, Object> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index 88e1484..8b0070b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.values.TupleTagList;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -236,6 +237,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
             WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
   }
 
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
   @Test
   public void finishBundleWithStatePutsStateInResult() throws Exception {
     TestPipeline p = TestPipeline.create();
@@ -321,6 +327,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
         containsInAnyOrder("foo", "bara", "bazam"));
   }
 
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
   @Test
   public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
     TestPipeline p = TestPipeline.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index 6a02e40..e562b28 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -166,6 +167,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     assertThat(result.getAggregatorChanges(), equalTo(mutator));
   }
 
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
   @Test
   public void finishBundleWithStatePutsStateInResult() throws Exception {
     TestPipeline p = TestPipeline.create();
@@ -238,6 +244,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
         containsInAnyOrder("foo", "bara", "bazam"));
   }
 
+  /**
+   * This test ignored, as today testing of GroupByKey is all the state that needs testing.
+   * This should be ported to state when ready.
+   */
+  @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.")
   @Test
   public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
     TestPipeline p = TestPipeline.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
index 84a0cd9..c164ce6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,6 +45,7 @@ import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.MutableDateTime;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -140,6 +140,9 @@ public class SplittableDoFnTest {
     }
   }
 
+  @Ignore(
+      "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; "
+          + "It must be implemented as a primitive.")
   @Test
   public void testPairWithIndexBasic() throws ClassNotFoundException {
     Pipeline p = TestPipeline.create();
@@ -164,6 +167,9 @@ public class SplittableDoFnTest {
     p.run();
   }
 
+  @Ignore(
+      "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; "
+          + "It must be implemented as a primitive.")
   @Test
   public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException {
     // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index a9f26a4..f16e0b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -73,6 +73,10 @@ import org.joda.time.Instant;
 public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
 
   public DoFn<InputT, OutputT> toDoFn() {
+    DoFn<InputT, OutputT> doFn = DoFnAdapters.getDoFn(this);
+    if (doFn != null) {
+      return doFn;
+    }
     if (this instanceof RequiresWindowAccess) {
       // No parameters as it just accesses `this`
       return new AdaptedRequiresWindowAccessDoFn();
@@ -553,8 +557,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
 
     private final DoFn<InputT, OutputT>.ProcessContext newContext;
 
-    public AdaptedProcessContext(
-        DoFn<InputT, OutputT>.ProcessContext newContext) {
+    public AdaptedProcessContext(DoFn<InputT, OutputT>.ProcessContext newContext) {
       this.newContext = newContext;
     }
 
@@ -632,21 +635,31 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
 
   private class AdaptedDoFn extends DoFn<InputT, OutputT> {
 
+    @Setup
+    public void setup() throws Exception {
+      OldDoFn.this.setup();
+    }
+
     @StartBundle
-    public void startBundle(DoFn.Context c) throws Exception {
+    public void startBundle(Context c) throws Exception {
       OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
     }
 
     @ProcessElement
-    public void processElement(DoFn.ProcessContext c) throws Exception {
+    public void processElement(ProcessContext c) throws Exception {
       OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c));
     }
 
     @FinishBundle
-    public void finishBundle(DoFn.Context c) throws Exception {
+    public void finishBundle(Context c) throws Exception {
       OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
     }
 
+    @Teardown
+    public void teardown() throws Exception {
+      OldDoFn.this.teardown();
+    }
+
     @Override
     public Duration getAllowedTimestampSkew() {
       return OldDoFn.this.getAllowedTimestampSkew();