You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/28 21:17:20 UTC

[1/5] incubator-beam git commit: Remove overspecified type in ParDoEvaluator

Repository: incubator-beam
Updated Branches:
  refs/heads/master 5e9a80cf6 -> 33c687069


Remove overspecified type in ParDoEvaluator


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b7b065f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b7b065f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b7b065f

Branch: refs/heads/master
Commit: 1b7b065f4ccae7c52934b1e73fd1fbfb33c3398d
Parents: 5e9a80c
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 21 11:34:45 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 28 11:07:28 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/direct/ParDoEvaluator.java    | 6 +++---
 .../org/apache/beam/runners/direct/ParDoEvaluatorFactory.java  | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b7b065f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 254fa44..3285c7e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -45,7 +45,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
   public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create(
       EvaluationContext evaluationContext,
       DirectStepContext stepContext,
-      AppliedPTransform<PCollection<InputT>, ?, ?> application,
+      AppliedPTransform<?, ?, ?> application,
       WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
       Serializable fn, // may be OldDoFn or DoFn
       List<PCollectionView<?>> sideInputs,
@@ -90,7 +90,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
   ////////////////////////////////////////////////////////////////////////////////////////////////
 
   private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
-  private final AppliedPTransform<PCollection<InputT>, ?, ?> transform;
+  private final AppliedPTransform<?, ?, ?> transform;
   private final AggregatorContainer.Mutator aggregatorChanges;
   private final Collection<UncommittedBundle<?>> outputBundles;
   private final DirectStepContext stepContext;
@@ -99,7 +99,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
 
   private ParDoEvaluator(
       PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
-      AppliedPTransform<PCollection<InputT>, ?, ?> transform,
+      AppliedPTransform<?, ?, ?> transform,
       AggregatorContainer.Mutator aggregatorChanges,
       Collection<UncommittedBundle<?>> outputBundles,
       DirectStepContext stepContext) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b7b065f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index f126000..b776da1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -82,7 +82,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     try {
       ParDo.BoundMulti<InputT, OutputT> transform = application.getTransform();
       return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
-          ParDoEvaluator.create(
+          ParDoEvaluator.<InputT, OutputT>create(
               evaluationContext,
               stepContext,
               application,


[2/5] incubator-beam git commit: Add simple tests for stateful ParDo

Posted by ke...@apache.org.
Add simple tests for stateful ParDo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e158e4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e158e4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e158e4e

Branch: refs/heads/master
Commit: 7e158e4e583372dd79ffaa380ac7c2dbb4846c50
Parents: e17dc4a
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 21 15:41:27 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 28 11:43:21 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 106 ++++++++++++++++++-
 1 file changed, 102 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e158e4e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index be1eaa4..593f304 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
@@ -1464,8 +1465,8 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  public void testValueState() {
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testValueStateSimple() {
     final String stateId = "foo";
 
     DoFn<KV<String, Integer>, Integer> fn =
@@ -1494,8 +1495,59 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  public void testBagSTate() {
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testValueStateSideOutput() {
+    final String stateId = "foo";
+
+    final TupleTag<Integer> evenTag = new TupleTag<Integer>() {};
+    final TupleTag<Integer> oddTag = new TupleTag<Integer>() {};
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @StateId(stateId)
+          private final StateSpec<Object, ValueState<Integer>> intState =
+              StateSpecs.value(VarIntCoder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+            Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
+            if (currentValue % 2 == 0) {
+              c.output(currentValue);
+            } else {
+              c.sideOutput(oddTag, currentValue);
+            }
+            state.write(currentValue + 1);
+          }
+        };
+
+    Pipeline p = TestPipeline.create();
+    PCollectionTuple output =
+        p.apply(
+                Create.of(
+                    KV.of("hello", 42),
+                    KV.of("hello", 97),
+                    KV.of("hello", 84),
+                    KV.of("goodbye", 33),
+                    KV.of("hello", 859),
+                    KV.of("goodbye", 83945)))
+            .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag)));
+
+    PCollection<Integer> evens = output.get(evenTag);
+    PCollection<Integer> odds = output.get(oddTag);
+
+    // There are 0 and 2 from "hello" and just 0 from "goodbye"
+    PAssert.that(evens).containsInAnyOrder(0, 2, 0);
+
+    // There are 1 and 3 from "hello" and just "1" from "goodbye"
+    PAssert.that(odds).containsInAnyOrder(1, 3, 1);
+    p.run();
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testBagState() {
     final String stateId = "foo";
 
     DoFn<KV<String, Integer>, List<Integer>> fn =
@@ -1530,6 +1582,52 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testBagStateSideInput() {
+    Pipeline p = TestPipeline.create();
+
+    final PCollectionView<List<Integer>> listView =
+        p.apply("Create list for side input", Create.of(2, 1, 0)).apply(View.<Integer>asList());
+
+    final String stateId = "foo";
+    DoFn<KV<String, Integer>, List<Integer>> fn =
+        new DoFn<KV<String, Integer>, List<Integer>>() {
+
+          @StateId(stateId)
+          private final StateSpec<Object, BagState<Integer>> bufferState =
+              StateSpecs.bag(VarIntCoder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) BagState<Integer> state) {
+            Iterable<Integer> currentValue = state.read();
+            state.add(c.element().getValue());
+            if (Iterables.size(state.read()) >= 4) {
+              List<Integer> sorted = Lists.newArrayList(currentValue);
+              Collections.sort(sorted);
+              c.output(sorted);
+
+              List<Integer> sideSorted = Lists.newArrayList(c.sideInput(listView));
+              Collections.sort(sideSorted);
+              c.output(sideSorted);
+            }
+          }
+        };
+
+    PCollection<List<Integer>> output =
+        p.apply(
+                "Create main input",
+                Create.of(
+                    KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 84), KV.of("hello", 12)))
+            .apply(ParDo.of(fn).withSideInputs(listView));
+
+    PAssert.that(output).containsInAnyOrder(
+        Lists.newArrayList(12, 42, 84, 97),
+        Lists.newArrayList(0, 1, 2));
+    p.run();
+  }
+
+  @Test
   public void testWithOutputTagsDisplayData() {
     DoFn<String, String> fn = new DoFn<String, String>() {
       @ProcessElement


[4/5] incubator-beam git commit: Add support for Stateful ParDo in the Direct runner

Posted by ke...@apache.org.
Add support for Stateful ParDo in the Direct runner

This adds overrides and new evaluators to ensure that
state is accessed in a single-threaded manner per key
and is cleaned up when a window expires.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec2c0e06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec2c0e06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec2c0e06

Branch: refs/heads/master
Commit: ec2c0e0698c1380b309a609eb642aba445c77e27
Parents: 7e158e4
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 9 21:59:15 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 28 11:48:32 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/EvaluationContext.java  |  15 +
 .../beam/runners/direct/ParDoEvaluator.java     |  11 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |  53 +++-
 .../direct/ParDoMultiOverrideFactory.java       |  76 ++++-
 .../ParDoSingleViaMultiOverrideFactory.java     |   6 +-
 .../direct/StatefulParDoEvaluatorFactory.java   | 256 ++++++++++++++++
 .../direct/TransformEvaluatorRegistry.java      |   2 +
 .../direct/WatermarkCallbackExecutor.java       |  34 +++
 .../StatefulParDoEvaluatorFactoryTest.java      | 300 +++++++++++++++++++
 .../org/apache/beam/sdk/transforms/DoFn.java    |   4 +-
 .../org/apache/beam/sdk/transforms/OldDoFn.java |   8 +-
 11 files changed, 741 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index c1225f6..201aaed 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -296,6 +296,21 @@ class EvaluationContext {
     fireAvailableCallbacks(lookupProducing(value));
   }
 
+  /**
+   * Schedule a callback to be executed after the given window is expired.
+   *
+   * <p>For example, upstream state associated with the window may be cleared.
+   */
+  public void scheduleAfterWindowExpiration(
+      AppliedPTransform<?, ?, ?> producing,
+      BoundedWindow window,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Runnable runnable) {
+    callbackExecutor.callOnWindowExpiration(producing, window, windowingStrategy, runnable);
+
+    fireAvailableCallbacks(producing);
+  }
+
   private AppliedPTransform<?, ?, ?> getProducing(PValue value) {
     if (value.getProducingTransformInternal() != null) {
       return value.getProducingTransformInternal();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 3285c7e..750e5f1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
+
   public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create(
       EvaluationContext evaluationContext,
       DirectStepContext stepContext,
@@ -84,11 +85,17 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
     }
 
     return new ParDoEvaluator<>(
-        runner, application, aggregatorChanges, outputBundles.values(), stepContext);
+        evaluationContext,
+        runner,
+        application,
+        aggregatorChanges,
+        outputBundles.values(),
+        stepContext);
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
 
+  private final EvaluationContext evaluationContext;
   private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
   private final AppliedPTransform<?, ?, ?> transform;
   private final AggregatorContainer.Mutator aggregatorChanges;
@@ -98,11 +105,13 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
   private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements;
 
   private ParDoEvaluator(
+      EvaluationContext evaluationContext,
       PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
       AppliedPTransform<?, ?, ?> transform,
       AggregatorContainer.Mutator aggregatorChanges,
       Collection<UncommittedBundle<?>> outputBundles,
       DirectStepContext stepContext) {
+    this.evaluationContext = evaluationContext;
     this.fnRunner = fnRunner;
     this.transform = transform;
     this.outputBundles = outputBundles;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index b776da1..02e034a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -20,14 +20,16 @@ package org.apache.beam.runners.direct;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import java.util.List;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,10 +56,26 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
   @Override
   public <T> TransformEvaluator<T> forApplication(
       AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
+
+    AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>>
+        parDoApplication =
+            (AppliedPTransform<
+                    PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>>)
+                application;
+
+    ParDo.BoundMulti<InputT, OutputT> transform = parDoApplication.getTransform();
+    final DoFn<InputT, OutputT> doFn = transform.getNewFn();
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator =
         (TransformEvaluator<T>)
-            createEvaluator((AppliedPTransform) application, (CommittedBundle) inputBundle);
+            createEvaluator(
+                (AppliedPTransform) application,
+                inputBundle.getKey(),
+                doFn,
+                transform.getSideInputs(),
+                transform.getMainOutputTag(),
+                transform.getSideOutputTags().getAll());
     return evaluator;
   }
 
@@ -66,21 +84,32 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     DoFnLifecycleManagers.removeAllFromManagers(fnClones.asMap().values());
   }
 
+  /**
+   * Creates an evaluator for an arbitrary {@link AppliedPTransform} node, with the pieces of the
+   * {@link ParDo} unpacked.
+   *
+   * <p>This can thus be invoked regardless of whether the types in the {@link AppliedPTransform}
+   * correspond with the type in the unpacked {@link DoFn}, side inputs, and output tags.
+   */
   @SuppressWarnings({"unchecked", "rawtypes"})
-  private TransformEvaluator<InputT> createEvaluator(
-      AppliedPTransform<PCollection<InputT>, PCollectionTuple, BoundMulti<InputT, OutputT>>
-          application,
-      CommittedBundle<InputT> inputBundle)
+  TransformEvaluator<InputT> createEvaluator(
+        AppliedPTransform<PCollection<?>, PCollectionTuple, ?>
+        application,
+        StructuralKey<?> inputBundleKey,
+        DoFn<InputT, OutputT> doFn,
+        List<PCollectionView<?>> sideInputs,
+        TupleTag<OutputT> mainOutputTag,
+        List<TupleTag<?>> sideOutputTags)
       throws Exception {
     String stepName = evaluationContext.getStepName(application);
     DirectStepContext stepContext =
         evaluationContext
-            .getExecutionContext(application, inputBundle.getKey())
+            .getExecutionContext(application, inputBundleKey)
             .getOrCreateStepContext(stepName, stepName);
 
-    DoFnLifecycleManager fnManager = fnClones.getUnchecked(application.getTransform().getNewFn());
+    DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn);
+
     try {
-      ParDo.BoundMulti<InputT, OutputT> transform = application.getTransform();
       return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
           ParDoEvaluator.<InputT, OutputT>create(
               evaluationContext,
@@ -88,9 +117,9 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
               application,
               application.getInput().getWindowingStrategy(),
               fnManager.get(),
-              transform.getSideInputs(),
-              transform.getMainOutputTag(),
-              transform.getSideOutputTags().getAll(),
+              sideInputs,
+              mainOutputTag,
+              sideOutputTags,
               application.getOutput().getAll()),
           fnManager);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 6cc3e6e..8db5159 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -18,13 +18,19 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypedPValue;
 
 /**
  * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
@@ -42,10 +48,74 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
 
     DoFn<InputT, OutputT> fn = transform.getNewFn();
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
-    if (!signature.processElement().isSplittable()) {
-      return transform;
-    } else {
+    if (signature.processElement().isSplittable()) {
       return new SplittableParDo(fn);
+    } else if (signature.stateDeclarations().size() > 0
+        || signature.timerDeclarations().size() > 0) {
+
+      // Based on the fact that the signature is stateful, DoFnSignatures ensures
+      // that it is also keyed
+      ParDo.BoundMulti<KV<?, ?>, OutputT> keyedTransform =
+          (ParDo.BoundMulti<KV<?, ?>, OutputT>) transform;
+
+      return new GbkThenStatefulParDo(keyedTransform);
+    } else {
+      return transform;
+    }
+  }
+
+  static class GbkThenStatefulParDo<K, InputT, OutputT>
+      extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
+    private final ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo;
+
+    public GbkThenStatefulParDo(ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo) {
+      this.underlyingParDo = underlyingParDo;
+    }
+
+    @Override
+    public PCollectionTuple apply(PCollection<KV<K, InputT>> input) {
+
+      PCollectionTuple outputs = input
+          .apply("Group by key", GroupByKey.<K, InputT>create())
+          .apply("Stateful ParDo", new StatefulParDo<>(underlyingParDo, input));
+
+      return outputs;
+    }
+  }
+
+  static class StatefulParDo<K, InputT, OutputT>
+      extends PTransform<PCollection<? extends KV<K, Iterable<InputT>>>, PCollectionTuple> {
+    private final transient ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo;
+    private final transient PCollection<KV<K, InputT>> originalInput;
+
+    public StatefulParDo(
+        ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo,
+        PCollection<KV<K, InputT>> originalInput) {
+      this.underlyingParDo = underlyingParDo;
+      this.originalInput = originalInput;
+    }
+
+    public ParDo.BoundMulti<KV<K, InputT>, OutputT> getUnderlyingParDo() {
+      return underlyingParDo;
+    }
+
+    @Override
+    public <T> Coder<T> getDefaultOutputCoder(
+        PCollection<? extends KV<K, Iterable<InputT>>> input, TypedPValue<T> output)
+        throws CannotProvideCoderException {
+      return underlyingParDo.getDefaultOutputCoder(originalInput, output);
+    }
+
+    public PCollectionTuple apply(PCollection<? extends KV<K, Iterable<InputT>>> input) {
+
+      PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
+          input.getPipeline(),
+          TupleTagList.of(underlyingParDo.getMainOutputTag())
+              .and(underlyingParDo.getSideOutputTags().getAll()),
+          input.getWindowingStrategy(),
+          input.isBounded());
+
+      return outputs;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
index ee3dfc5..f220a46 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -54,13 +54,15 @@ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
       // Output tags for ParDo need only be unique up to applied transform
       TupleTag<OutputT> mainOutputTag = new TupleTag<OutputT>(MAIN_OUTPUT_TAG);
 
-      PCollectionTuple output =
+      PCollectionTuple outputs =
           input.apply(
               ParDo.of(underlyingParDo.getNewFn())
                   .withSideInputs(underlyingParDo.getSideInputs())
                   .withOutputTags(mainOutputTag, TupleTagList.empty()));
+      PCollection<OutputT> output = outputs.get(mainOutputTag);
 
-      return output.get(mainOutputTag);
+      output.setTypeDescriptorInternal(underlyingParDo.getNewFn().getOutputTypeDescriptor());
+      return output;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
new file mode 100644
index 0000000..1f3286c
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+
+/** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */
+final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements TransformEvaluatorFactory {
+
+  private final LoadingCache<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable>
+      cleanupRegistry;
+
+  private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;
+
+  StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+    this.cleanupRegistry =
+        CacheBuilder.newBuilder()
+            .weakValues()
+            .build(new CleanupSchedulingLoader(evaluationContext));
+  }
+
+  @Override
+  public <T> TransformEvaluator<T> forApplication(
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    TransformEvaluator<T> evaluator =
+        (TransformEvaluator<T>)
+            createEvaluator((AppliedPTransform) application, (CommittedBundle) inputBundle);
+    return evaluator;
+  }
+
+  @Override
+  public void cleanup() throws Exception {
+    delegateFactory.cleanup();
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private TransformEvaluator<KV<K, Iterable<InputT>>> createEvaluator(
+      AppliedPTransform<
+              PCollection<? extends KV<K, Iterable<InputT>>>, PCollectionTuple,
+              StatefulParDo<K, InputT, OutputT>>
+          application,
+      CommittedBundle<KV<K, Iterable<InputT>>> inputBundle)
+      throws Exception {
+
+    final DoFn<KV<K, InputT>, OutputT> doFn =
+        application.getTransform().getUnderlyingParDo().getNewFn();
+    final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    // If the DoFn is stateful, schedule state clearing.
+    // It is semantically correct to schedule any number of redundant clear tasks; the
+    // cache is used to limit the number of tasks to avoid performance degradation.
+    if (signature.stateDeclarations().size() > 0) {
+      for (final WindowedValue<?> element : inputBundle.getElements()) {
+        for (final BoundedWindow window : element.getWindows()) {
+          cleanupRegistry.get(
+              AppliedPTransformOutputKeyAndWindow.create(
+                  application, (StructuralKey<K>) inputBundle.getKey(), window));
+        }
+      }
+    }
+
+    TransformEvaluator<KV<K, InputT>> delegateEvaluator =
+        delegateFactory.createEvaluator(
+            (AppliedPTransform) application,
+            inputBundle.getKey(),
+            doFn,
+            application.getTransform().getUnderlyingParDo().getSideInputs(),
+            application.getTransform().getUnderlyingParDo().getMainOutputTag(),
+            application.getTransform().getUnderlyingParDo().getSideOutputTags().getAll());
+
+    return new StatefulParDoEvaluator<>(delegateEvaluator);
+  }
+
+  private class CleanupSchedulingLoader
+      extends CacheLoader<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable> {
+
+    private final EvaluationContext evaluationContext;
+
+    public CleanupSchedulingLoader(EvaluationContext evaluationContext) {
+      this.evaluationContext = evaluationContext;
+    }
+
+    @Override
+    public Runnable load(
+        final AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> transformOutputWindow) {
+      String stepName = evaluationContext.getStepName(transformOutputWindow.getTransform());
+
+      PCollection<?> pc =
+          transformOutputWindow
+              .getTransform()
+              .getOutput()
+              .get(
+                  transformOutputWindow
+                      .getTransform()
+                      .getTransform()
+                      .getUnderlyingParDo()
+                      .getMainOutputTag());
+      WindowingStrategy<?, ?> windowingStrategy = pc.getWindowingStrategy();
+      BoundedWindow window = transformOutputWindow.getWindow();
+      final DoFn<?, ?> doFn =
+          transformOutputWindow.getTransform().getTransform().getUnderlyingParDo().getNewFn();
+      final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+      final DirectStepContext stepContext =
+          evaluationContext
+              .getExecutionContext(
+                  transformOutputWindow.getTransform(), transformOutputWindow.getKey())
+              .getOrCreateStepContext(stepName, stepName);
+
+      final StateNamespace namespace =
+          StateNamespaces.window(
+              (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(), window);
+
+      Runnable cleanup =
+          new Runnable() {
+            @Override
+            public void run() {
+              for (StateDeclaration stateDecl : signature.stateDeclarations().values()) {
+                StateTag<Object, ?> tag;
+                try {
+                  tag =
+                      StateTags.tagForSpec(stateDecl.id(), (StateSpec) stateDecl.field().get(doFn));
+                } catch (IllegalAccessException e) {
+                  throw new RuntimeException(
+                      String.format(
+                          "Error accessing %s for %s",
+                          StateSpec.class.getName(), doFn.getClass().getName()),
+                      e);
+                }
+                stepContext.stateInternals().state(namespace, tag).clear();
+              }
+              cleanupRegistry.invalidate(transformOutputWindow);
+            }
+          };
+
+      evaluationContext.scheduleAfterWindowExpiration(
+          transformOutputWindow.getTransform(), window, windowingStrategy, cleanup);
+      return cleanup;
+    }
+  }
+
+  @AutoValue
+  abstract static class AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> {
+    abstract AppliedPTransform<
+            PCollection<? extends KV<K, Iterable<InputT>>>, PCollectionTuple,
+            StatefulParDo<K, InputT, OutputT>>
+        getTransform();
+
+    abstract StructuralKey<K> getKey();
+
+    abstract BoundedWindow getWindow();
+
+    static <K, InputT, OutputT> AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> create(
+        AppliedPTransform<
+                PCollection<? extends KV<K, Iterable<InputT>>>, PCollectionTuple,
+                StatefulParDo<K, InputT, OutputT>>
+            transform,
+        StructuralKey<K> key,
+        BoundedWindow w) {
+      return new AutoValue_StatefulParDoEvaluatorFactory_AppliedPTransformOutputKeyAndWindow<>(
+          transform, key, w);
+    }
+  }
+
+  private static class StatefulParDoEvaluator<K, InputT>
+      implements TransformEvaluator<KV<K, Iterable<InputT>>> {
+
+    private final TransformEvaluator<KV<K, InputT>> delegateEvaluator;
+
+    public StatefulParDoEvaluator(TransformEvaluator<KV<K, InputT>> delegateEvaluator) {
+      this.delegateEvaluator = delegateEvaluator;
+    }
+
+    @Override
+    public void processElement(WindowedValue<KV<K, Iterable<InputT>>> gbkResult) throws Exception {
+
+      for (InputT value : gbkResult.getValue().getValue()) {
+        delegateEvaluator.processElement(
+            gbkResult.withValue(KV.of(gbkResult.getValue().getKey(), value)));
+      }
+    }
+
+    @Override
+    public TransformResult<KV<K, Iterable<InputT>>> finishBundle() throws Exception {
+      TransformResult<KV<K, InputT>> delegateResult = delegateEvaluator.finishBundle();
+
+      StepTransformResult.Builder<KV<K, Iterable<InputT>>> regroupedResult =
+          StepTransformResult.<KV<K, Iterable<InputT>>>withHold(
+                  delegateResult.getTransform(), delegateResult.getWatermarkHold())
+              .withTimerUpdate(delegateResult.getTimerUpdate())
+              .withAggregatorChanges(delegateResult.getAggregatorChanges())
+              .withMetricUpdates(delegateResult.getLogicalMetricUpdates())
+              .addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));
+
+      // The delegate may have pushed back unprocessed elements across multiple keys and windows.
+      // Since processing is single-threaded per key and window, we don't need to regroup the
+      // outputs, but just make a bunch of singletons
+      for (WindowedValue<?> untypedUnprocessed : delegateResult.getUnprocessedElements()) {
+        WindowedValue<KV<K, InputT>> windowedKv = (WindowedValue<KV<K, InputT>>) untypedUnprocessed;
+        WindowedValue<KV<K, Iterable<InputT>>> pushedBack =
+            windowedKv.withValue(
+                KV.of(
+                    windowedKv.getValue().getKey(),
+                    (Iterable<InputT>)
+                        Collections.singletonList(windowedKv.getValue().getValue())));
+
+        regroupedResult.addUnprocessedElements(pushedBack);
+      }
+
+      return regroupedResult.build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 0514c3a..a4c462a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
@@ -50,6 +51,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt))
             .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
             .put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt))
+            .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
             .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt))
             .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
             .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
index 54cab7c..fcefc5f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -89,6 +89,32 @@ class WatermarkCallbackExecutor {
   }
 
   /**
+   * Execute the provided {@link Runnable} after the next call to
+   * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window
+   * is guaranteed to be expired.
+   */
+  public void callOnWindowExpiration(
+      AppliedPTransform<?, ?, ?> step,
+      BoundedWindow window,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Runnable runnable) {
+    WatermarkCallback callback =
+        WatermarkCallback.afterWindowExpiration(window, windowingStrategy, runnable);
+
+    PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
+    if (callbackQueue == null) {
+      callbackQueue = new PriorityQueue<>(11, new CallbackOrdering());
+      if (callbacks.putIfAbsent(step, callbackQueue) != null) {
+        callbackQueue = callbacks.get(step);
+      }
+    }
+
+    synchronized (callbackQueue) {
+      callbackQueue.offer(callback);
+    }
+  }
+
+  /**
    * Schedule all pending callbacks that must have produced output by the time of the provided
    * watermark.
    */
@@ -112,6 +138,14 @@ class WatermarkCallbackExecutor {
       return new WatermarkCallback(firingAfter, callback);
     }
 
+    public static <W extends BoundedWindow> WatermarkCallback afterWindowExpiration(
+        BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
+      // Fire one milli past the end of the window. This ensures that all window expiration
+      // timers are delivered first
+      Instant firingAfter = window.maxTimestamp().plus(strategy.getAllowedLateness()).plus(1L);
+      return new WatermarkCallback(firingAfter, callback);
+    }
+
     private final Instant fireAfter;
     private final Runnable callback;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
new file mode 100644
index 0000000..ecf11ed
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link StatefulParDoEvaluatorFactory}. */
+@RunWith(JUnit4.class)
+public class StatefulParDoEvaluatorFactoryTest implements Serializable {
+  @Mock private transient EvaluationContext mockEvaluationContext;
+  @Mock private transient DirectExecutionContext mockExecutionContext;
+  @Mock private transient DirectExecutionContext.DirectStepContext mockStepContext;
+  @Mock private transient ReadyCheckingSideInputReader mockSideInputReader;
+  @Mock private transient UncommittedBundle<Integer> mockUncommittedBundle;
+
+  private static final String KEY = "any-key";
+  private transient StateInternals<Object> stateInternals =
+      CopyOnAccessInMemoryStateInternals.<Object>withUnderlying(KEY, null);
+
+  private static final BundleFactory BUNDLE_FACTORY = ImmutableListBundleFactory.create();
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when((StateInternals<Object>) mockStepContext.stateInternals()).thenReturn(stateInternals);
+  }
+
+  @Test
+  public void windowCleanupScheduled() throws Exception {
+    // To test the factory, first we set up a pipeline and then we use the constructed
+    // pipeline to create the right parameters to pass to the factory
+    TestPipeline pipeline = TestPipeline.create();
+
+    final String stateId = "my-state-id";
+
+    // For consistency, window it into FixedWindows. Actually we will fabricate an input bundle.
+    PCollection<KV<String, Integer>> input =
+        pipeline
+            .apply(Create.of(KV.of("hello", 1), KV.of("hello", 2)))
+            .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(10))));
+
+    PCollection<Integer> produced =
+        input.apply(
+            ParDo.of(
+                new DoFn<KV<String, Integer>, Integer>() {
+                  @StateId(stateId)
+                  private final StateSpec<Object, ValueState<String>> spec =
+                      StateSpecs.value(StringUtf8Coder.of());
+
+                  @ProcessElement
+                  public void process(ProcessContext c) {}
+                }));
+
+    StatefulParDoEvaluatorFactory<String, Integer, Integer> factory =
+        new StatefulParDoEvaluatorFactory(mockEvaluationContext);
+
+    AppliedPTransform<
+            PCollection<? extends KV<String, Iterable<Integer>>>, PCollectionTuple,
+            StatefulParDo<String, Integer, Integer>>
+        producingTransform = (AppliedPTransform) produced.getProducingTransformInternal();
+
+    // Then there will be a digging down to the step context to get the state internals
+    when(mockEvaluationContext.getExecutionContext(
+            eq(producingTransform), Mockito.<StructuralKey>any()))
+        .thenReturn(mockExecutionContext);
+    when(mockExecutionContext.getOrCreateStepContext(anyString(), anyString()))
+        .thenReturn(mockStepContext);
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(19));
+
+    StateNamespace firstWindowNamespace =
+        StateNamespaces.window(IntervalWindow.getCoder(), firstWindow);
+    StateNamespace secondWindowNamespace =
+        StateNamespaces.window(IntervalWindow.getCoder(), secondWindow);
+    StateTag<Object, ValueState<String>> tag =
+        StateTags.tagForSpec(stateId, StateSpecs.value(StringUtf8Coder.of()));
+
+    // Set up non-empty state. We don't mock + verify calls to clear() but instead
+    // check that state is actually empty. We musn't care how it is accomplished.
+    stateInternals.state(firstWindowNamespace, tag).write("first");
+    stateInternals.state(secondWindowNamespace, tag).write("second");
+
+    // A single bundle with some elements in the global window; it should register cleanup for the
+    // global window state merely by having the evaluator created. The cleanup logic does not
+    // depend on the window.
+    CommittedBundle<KV<String, Integer>> inputBundle =
+        BUNDLE_FACTORY
+            .createBundle(input)
+            .add(
+                WindowedValue.of(
+                    KV.of("hello", 1), new Instant(3), firstWindow, PaneInfo.NO_FIRING))
+            .add(
+                WindowedValue.of(
+                    KV.of("hello", 2), new Instant(11), secondWindow, PaneInfo.NO_FIRING))
+            .commit(Instant.now());
+
+    // Merely creating the evaluator should suffice to register the cleanup callback
+    factory.forApplication(producingTransform, inputBundle);
+
+    ArgumentCaptor<Runnable> argumentCaptor = ArgumentCaptor.forClass(Runnable.class);
+    verify(mockEvaluationContext)
+        .scheduleAfterWindowExpiration(
+            eq(producingTransform),
+            eq(firstWindow),
+            Mockito.<WindowingStrategy<?, ?>>any(),
+            argumentCaptor.capture());
+
+    // Should actually clear the state for the first window
+    argumentCaptor.getValue().run();
+    assertThat(stateInternals.state(firstWindowNamespace, tag).read(), nullValue());
+    assertThat(stateInternals.state(secondWindowNamespace, tag).read(), equalTo("second"));
+
+    verify(mockEvaluationContext)
+        .scheduleAfterWindowExpiration(
+            eq(producingTransform),
+            eq(secondWindow),
+            Mockito.<WindowingStrategy<?, ?>>any(),
+            argumentCaptor.capture());
+
+    // Should actually clear the state for the second window
+    argumentCaptor.getValue().run();
+    assertThat(stateInternals.state(secondWindowNamespace, tag).read(), nullValue());
+  }
+
+  /**
+   * A test that explicitly delays a side input so that the main input will have to be reprocessed,
+   * testing that {@code finishBundle()} re-assembles the GBK outputs correctly.
+   */
+  @Test
+  public void testUnprocessedElements() throws Exception {
+    // To test the factory, first we set up a pipeline and then we use the constructed
+    // pipeline to create the right parameters to pass to the factory
+    TestPipeline pipeline = TestPipeline.create();
+
+    final String stateId = "my-state-id";
+
+    // For consistency, window it into FixedWindows. Actually we will fabricate an input bundle.
+    PCollection<KV<String, Integer>> mainInput =
+        pipeline
+            .apply(Create.of(KV.of("hello", 1), KV.of("hello", 2)))
+            .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(10))));
+
+    final PCollectionView<List<Integer>> sideInput =
+        pipeline
+            .apply("Create side input", Create.of(42))
+            .apply("Window side input", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
+            .apply("View side input", View.<Integer>asList());
+
+    PCollection<Integer> produced =
+        mainInput.apply(
+            ParDo.withSideInputs(sideInput)
+                .of(
+                    new DoFn<KV<String, Integer>, Integer>() {
+                      @StateId(stateId)
+                      private final StateSpec<Object, ValueState<String>> spec =
+                          StateSpecs.value(StringUtf8Coder.of());
+
+                      @ProcessElement
+                      public void process(ProcessContext c) {}
+                    }));
+
+    StatefulParDoEvaluatorFactory<String, Integer, Integer> factory =
+        new StatefulParDoEvaluatorFactory(mockEvaluationContext);
+
+    // This will be the stateful ParDo from the expansion
+    AppliedPTransform<
+            PCollection<KV<String, Iterable<Integer>>>, PCollectionTuple,
+            StatefulParDo<String, Integer, Integer>>
+        producingTransform = (AppliedPTransform) produced.getProducingTransformInternal();
+
+    // Then there will be a digging down to the step context to get the state internals
+    when(mockEvaluationContext.getExecutionContext(
+            eq(producingTransform), Mockito.<StructuralKey>any()))
+        .thenReturn(mockExecutionContext);
+    when(mockExecutionContext.getOrCreateStepContext(anyString(), anyString()))
+        .thenReturn(mockStepContext);
+    when(mockEvaluationContext.createBundle(Matchers.<PCollection<Integer>>any()))
+        .thenReturn(mockUncommittedBundle);
+    when(mockStepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty());
+
+    // And digging to check whether the window is ready
+    when(mockEvaluationContext.createSideInputReader(anyList())).thenReturn(mockSideInputReader);
+    when(mockSideInputReader.isReady(
+            Matchers.<PCollectionView<?>>any(), Matchers.<BoundedWindow>any()))
+        .thenReturn(false);
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9));
+
+    // A single bundle with some elements in the global window; it should register cleanup for the
+    // global window state merely by having the evaluator created. The cleanup logic does not
+    // depend on the window.
+    WindowedValue<KV<String, Iterable<Integer>>> gbkOutputElement =
+        WindowedValue.of(
+            KV.<String, Iterable<Integer>>of("hello", Lists.newArrayList(1, 13, 15)),
+            new Instant(3),
+            firstWindow,
+            PaneInfo.NO_FIRING);
+    CommittedBundle<KV<String, Iterable<Integer>>> inputBundle =
+        BUNDLE_FACTORY
+            .createBundle(producingTransform.getInput())
+            .add(gbkOutputElement)
+            .commit(Instant.now());
+    TransformEvaluator<KV<String, Iterable<Integer>>> evaluator =
+        factory.forApplication(producingTransform, inputBundle);
+    evaluator.processElement(gbkOutputElement);
+
+    // This should push back every element as a KV<String, Iterable<Integer>>
+    // in the appropriate window. Since the keys are equal they are single-threaded
+    TransformResult<KV<String, Iterable<Integer>>> result = evaluator.finishBundle();
+
+    List<Integer> pushedBackInts = new ArrayList<>();
+
+    for (WindowedValue<?> unprocessedElement : result.getUnprocessedElements()) {
+      WindowedValue<KV<String, Iterable<Integer>>> unprocessedKv =
+          (WindowedValue<KV<String, Iterable<Integer>>>) unprocessedElement;
+
+      assertThat(
+          Iterables.getOnlyElement(unprocessedElement.getWindows()),
+          equalTo((BoundedWindow) firstWindow));
+      assertThat(unprocessedKv.getValue().getKey(), equalTo("hello"));
+      for (Integer i : unprocessedKv.getValue().getValue()) {
+        pushedBackInts.add(i);
+      }
+    }
+    assertThat(pushedBackInts, containsInAnyOrder(1, 13, 15));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 221d942..3f1a3f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -315,7 +315,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    *
    * <p>See {@link #getOutputTypeDescriptor} for more discussion.
    */
-  protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+  public TypeDescriptor<InputT> getInputTypeDescriptor() {
     return new TypeDescriptor<InputT>(getClass()) {};
   }
 
@@ -330,7 +330,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    * for choosing a default output {@code Coder<O>} for the output
    * {@code PCollection<O>}.
    */
-  protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+  public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
     return new TypeDescriptor<OutputT>(getClass()) {};
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec2c0e06/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 9bf9003..2d2c1fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -671,7 +671,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
     }
 
     @Override
-    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+    public TypeDescriptor<InputT> getInputTypeDescriptor() {
       return OldDoFn.this.getInputTypeDescriptor();
     }
 
@@ -681,7 +681,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
     }
 
     @Override
-    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+    public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
       return OldDoFn.this.getOutputTypeDescriptor();
     }
   }
@@ -746,12 +746,12 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
     }
 
     @Override
-    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+    public TypeDescriptor<InputT> getInputTypeDescriptor() {
       return OldDoFn.this.getInputTypeDescriptor();
     }
 
     @Override
-    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+    public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
       return OldDoFn.this.getOutputTypeDescriptor();
     }
   }


[5/5] incubator-beam git commit: This closes #1399

Posted by ke...@apache.org.
This closes #1399


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33c68706
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33c68706
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33c68706

Branch: refs/heads/master
Commit: 33c687069adc1ce4481a0c9637b2a77abb9450b8
Parents: 5e9a80c ec2c0e0
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 28 13:03:51 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 28 13:03:51 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     |  60 +++-
 .../beam/runners/direct/EvaluationContext.java  |  15 +
 .../beam/runners/direct/ParDoEvaluator.java     |  17 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |  55 +++-
 .../direct/ParDoMultiOverrideFactory.java       |  76 ++++-
 .../ParDoSingleViaMultiOverrideFactory.java     |   6 +-
 .../direct/StatefulParDoEvaluatorFactory.java   | 256 ++++++++++++++++
 .../direct/TransformEvaluatorRegistry.java      |   2 +
 .../direct/WatermarkCallbackExecutor.java       |  34 +++
 .../StatefulParDoEvaluatorFactoryTest.java      | 300 +++++++++++++++++++
 .../org/apache/beam/sdk/transforms/DoFn.java    |   4 +-
 .../org/apache/beam/sdk/transforms/OldDoFn.java |   8 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  10 -
 .../sdk/transforms/reflect/DoFnSignature.java   |   1 +
 .../apache/beam/sdk/transforms/ParDoTest.java   | 177 ++++++++++-
 15 files changed, 961 insertions(+), 60 deletions(-)
----------------------------------------------------------------------



[3/5] incubator-beam git commit: Add State parameter support to SimpleDoFnRunner

Posted by ke...@apache.org.
Add State parameter support to SimpleDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e17dc4af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e17dc4af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e17dc4af

Branch: refs/heads/master
Commit: e17dc4af9f7de717872d6c6f0ab52e0498f3b782
Parents: 1b7b065
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 9 21:10:51 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 28 11:43:21 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 60 +++++++++++++--
 .../org/apache/beam/sdk/transforms/ParDo.java   | 10 ---
 .../sdk/transforms/reflect/DoFnSignature.java   |  1 +
 .../apache/beam/sdk/transforms/ParDoTest.java   | 79 ++++++++++++++++----
 4 files changed, 118 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e17dc4af/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index f611c0a..68751f0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -25,7 +25,9 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
@@ -37,6 +39,7 @@ import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -55,6 +58,10 @@ import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
@@ -87,6 +94,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
 
   private final boolean observesWindow;
 
+  private final DoFnSignature signature;
+
+  private final Coder<BoundedWindow> windowCoder;
+
+  // Because of setKey(Object), we really must refresh stateInternals() at each access
+  private final StepContext stepContext;
+
   public SimpleDoFnRunner(
       PipelineOptions options,
       DoFn<InputT, OutputT> fn,
@@ -98,11 +112,20 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = fn;
-    this.observesWindow =
-        DoFnSignatures.getSignature(fn.getClass()).processElement().observesWindow();
+    this.signature = DoFnSignatures.getSignature(fn.getClass());
+    this.observesWindow = signature.processElement().observesWindow();
     this.invoker = DoFnInvokers.invokerFor(fn);
     this.outputManager = outputManager;
     this.mainOutputTag = mainOutputTag;
+    this.stepContext = stepContext;
+
+    // This is a cast of an _invariant_ coder. But we are assured by pipeline validation
+    // that it really is the coder for whatever BoundedWindow subclass is provided
+    @SuppressWarnings("unchecked")
+    Coder<BoundedWindow> untypedCoder =
+        (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder();
+    this.windowCoder = untypedCoder;
+
     this.context =
         new DoFnContext<>(
             options,
@@ -113,7 +136,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
             sideOutputTags,
             stepContext,
             aggregatorFactory,
-            windowingStrategy == null ? null : windowingStrategy.getWindowFn());
+            windowingStrategy.getWindowFn());
   }
 
   @Override
@@ -427,6 +450,23 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     final DoFnContext<InputT, OutputT> context;
     final WindowedValue<InputT> windowedValue;
 
+    /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
+    @Nullable private StateNamespace namespace;
+
+    /**
+     * The state namespace for this context.
+     *
+     * <p>Any call to {@link #getNamespace()} when more than one window is present will crash; this
+     * represents a bug in the runner or the {@link DoFnSignature}, since values must be in exactly
+     * one window when state or timers are relevant.
+     */
+    private StateNamespace getNamespace() {
+      if (namespace == null) {
+        namespace = StateNamespaces.window(windowCoder, window());
+      }
+      return namespace;
+    }
+
     private DoFnProcessContext(
         DoFn<InputT, OutputT> fn,
         DoFnContext<InputT, OutputT> context,
@@ -564,8 +604,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
-    public State state(String timerId) {
-      throw new UnsupportedOperationException("State parameters are not supported.");
+    public State state(String stateId) {
+      try {
+        StateSpec<?, ?> spec =
+            (StateSpec<?, ?>) signature.stateDeclarations().get(stateId).field().get(fn);
+        return stepContext
+            .stateInternals()
+            .state(getNamespace(), StateTags.tagForSpec(stateId, (StateSpec) spec));
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     @Override
@@ -593,7 +641,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
 
         @Override
         public StateInternals<?> stateInternals() {
-          return context.stepContext.stateInternals();
+          return stepContext.stateInternals();
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e17dc4af/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 215ae6a..9453294 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -596,16 +596,6 @@ public class ParDo {
 
     // To be removed when the features are complete and runners have their own adequate
     // rejection logic
-    if (!signature.stateDeclarations().isEmpty()) {
-      throw new UnsupportedOperationException(
-          String.format("Found %s annotations on %s, but %s cannot yet be used with state.",
-              DoFn.StateId.class.getSimpleName(),
-              fn.getClass().getName(),
-              DoFn.class.getSimpleName()));
-    }
-
-    // To be removed when the features are complete and runners have their own adequate
-    // rejection logic
     if (!signature.timerDeclarations().isEmpty()) {
       throw new UnsupportedOperationException(
           String.format("Found %s annotations on %s, but %s cannot yet be used with timers.",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e17dc4af/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 1c16030..cd93583 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -523,6 +523,7 @@ public abstract class DoFnSignature {
 
     static StateDeclaration create(
         String id, Field field, TypeDescriptor<? extends State> stateType) {
+      field.setAccessible(true);
       return new AutoValue_DoFnSignature_StateDeclaration(id, field, stateType);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e17dc4af/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 3c3e266..be1eaa4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -36,6 +36,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -68,6 +71,7 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -1459,27 +1463,70 @@ public class ParDoTest implements Serializable {
     assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
   }
 
-  /**
-   * A test that we properly reject {@link DoFn} implementations that
-   * include {@link DoFn.StateId} annotations, for now.
-   */
   @Test
-  public void testUnsupportedState() {
-    thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("cannot yet be used with state");
+  @Category(RunnableOnService.class)
+  public void testValueState() {
+    final String stateId = "foo";
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @StateId(stateId)
+          private final StateSpec<Object, ValueState<Integer>> intState =
+              StateSpecs.value(VarIntCoder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+            Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
+            c.output(currentValue);
+            state.write(currentValue + 1);
+          }
+        };
 
-    DoFn<KV<String, String>, KV<String, String>> fn =
-        new DoFn<KV<String, String>, KV<String, String>>() {
+    Pipeline p = TestPipeline.create();
+    PCollection<Integer> output =
+        p.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84)))
+            .apply(ParDo.of(fn));
 
-      @StateId("foo")
-      private final StateSpec<Object, ValueState<Integer>> intState =
-          StateSpecs.value(VarIntCoder.of());
+    PAssert.that(output).containsInAnyOrder(0, 1, 2);
+    p.run();
+  }
 
-      @ProcessElement
-      public void processElement(ProcessContext c) { }
-    };
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBagSTate() {
+    final String stateId = "foo";
+
+    DoFn<KV<String, Integer>, List<Integer>> fn =
+        new DoFn<KV<String, Integer>, List<Integer>>() {
+
+          @StateId(stateId)
+          private final StateSpec<Object, BagState<Integer>> bufferState =
+              StateSpecs.bag(VarIntCoder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) BagState<Integer> state) {
+            Iterable<Integer> currentValue = state.read();
+            state.add(c.element().getValue());
+            if (Iterables.size(state.read()) >= 4) {
+              List<Integer> sorted = Lists.newArrayList(currentValue);
+              Collections.sort(sorted);
+              c.output(sorted);
+            }
+          }
+        };
 
-    ParDo.of(fn);
+    Pipeline p = TestPipeline.create();
+    PCollection<List<Integer>> output =
+        p.apply(
+                Create.of(
+                    KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 84), KV.of("hello", 12)))
+            .apply(ParDo.of(fn));
+
+    PAssert.that(output).containsInAnyOrder(Lists.newArrayList(12, 42, 84, 97));
+    p.run();
   }
 
   @Test