You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/25 19:11:45 UTC

[1/4] incubator-beam git commit: Port runners to use GroupAlsoByWindows via StateInternalsFactory

Repository: incubator-beam
Updated Branches:
  refs/heads/master cf1464465 -> 7809f6bd2


Port runners to use GroupAlsoByWindows via StateInternalsFactory


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/902997d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/902997d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/902997d0

Branch: refs/heads/master
Commit: 902997d040023c83d23e57362bdfb2d62c53d142
Parents: d2594e0
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 18 14:33:58 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 25 09:30:32 2016 -0700

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 16 ++--
 .../beam/sdk/util/GroupAlsoByWindowsDoFn.java   | 10 ++-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  | 10 +--
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   | 79 +++++++++++++++++---
 .../sdk/util/GroupAlsoByWindowsProperties.java  | 50 +++++++++++--
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  8 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      | 30 ++++++++
 .../beam/runners/direct/ParDoEvaluator.java     |  4 +-
 .../direct/ParDoMultiEvaluatorFactory.java      |  6 ++
 .../direct/ParDoSingleEvaluatorFactory.java     |  6 ++
 .../beam/runners/direct/ParDoEvaluatorTest.java |  1 +
 .../FlinkGroupAlsoByWindowWrapper.java          | 31 +++++---
 .../spark/translation/TransformTranslator.java  | 70 ++++++++++++++++-
 .../apache/beam/sdk/transforms/DoFnTester.java  | 41 +++++++++-
 14 files changed, 309 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 73244f7..0d320bc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 
 /**
@@ -44,8 +45,10 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
   public static <K, InputT, OutputT, W extends BoundedWindow>
       DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
-          WindowingStrategy<?, W> strategy, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
-    return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, reduceFn);
+          WindowingStrategy<?, W> strategy,
+          StateInternalsFactory<K> stateInternalsFactory,
+          SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
+    return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, stateInternalsFactory, reduceFn);
   }
 
   protected final Aggregator<Long, Long> droppedDueToClosedWindow =
@@ -55,15 +58,18 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
       createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
 
   private final WindowingStrategy<Object, W> windowingStrategy;
+  private final StateInternalsFactory<K> stateInternalsFactory;
   private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
 
   private GroupAlsoByWindowViaWindowSetDoFn(
       WindowingStrategy<?, W> windowingStrategy,
+      StateInternalsFactory<K> stateInternalsFactory,
       SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
     @SuppressWarnings("unchecked")
     WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy;
     this.windowingStrategy = noWildcard;
     this.reduceFn = reduceFn;
+    this.stateInternalsFactory = stateInternalsFactory;
   }
 
   @Override
@@ -72,11 +78,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
     K key = c.element().key();
     TimerInternals timerInternals = c.windowingInternals().timerInternals();
-
-    // It is the responsibility of the user of GroupAlsoByWindowsViaWindowSet to only
-    // provide a WindowingInternals instance with the appropriate key type for StateInternals.
-    @SuppressWarnings("unchecked")
-    StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals();
+    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
         new ReduceFnRunner<>(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
index f5de0bc..b575559 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 
 /**
@@ -51,9 +52,12 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
    * @param windowingStrategy The window function and trigger to use for grouping
    * @param inputCoder the input coder to use
    */
-  public static <K, V, W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W>
-      createDefault(WindowingStrategy<?, W> windowingStrategy, Coder<V> inputCoder) {
+  public static <K, V, W extends BoundedWindow>
+      GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W> createDefault(
+          WindowingStrategy<?, W> windowingStrategy,
+          StateInternalsFactory<K> stateInternalsFactory,
+          Coder<V> inputCoder) {
     return new GroupAlsoByWindowsViaOutputBufferDoFn<>(
-        windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
+        windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, V, W>buffering(inputCoder));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
index d364168..d185a24 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 
 import com.google.common.collect.Iterables;
@@ -37,13 +38,16 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
    extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
 
   private final WindowingStrategy<?, W> strategy;
+  private final StateInternalsFactory<K> stateInternalsFactory;
   private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
 
   public GroupAlsoByWindowsViaOutputBufferDoFn(
       WindowingStrategy<?, W> windowingStrategy,
+      StateInternalsFactory<K> stateInternalsFactory,
       SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
     this.strategy = windowingStrategy;
     this.reduceFn = reduceFn;
+    this.stateInternalsFactory = stateInternalsFactory;
   }
 
   @Override
@@ -55,11 +59,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
     // timer manager from the context because it doesn't exist. So we create one and emulate the
     // watermark, knowing that we have all data and it is in timestamp order.
     BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now());
-
-    // It is the responsibility of the user of GroupAlsoByWindowsViaOutputBufferDoFn to only
-    // provide a WindowingInternals instance with the appropriate key type for StateInternals.
-    @SuppressWarnings("unchecked")
-    StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals();
+    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
         new ReduceFnRunner<K, InputT, OutputT, W>(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
index 9450495..8a0152e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -26,6 +28,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -68,7 +71,7 @@ import java.util.List;
 public class GroupByKeyViaGroupByKeyOnly<K, V>
     extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
 
-  private GroupByKey<K, V> gbkTransform;
+  private final GroupByKey<K, V> gbkTransform;
 
   public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
     this.gbkTransform = originalTransform;
@@ -161,13 +164,12 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
   }
 
   /**
-   * Helper transform that takes a collection of timestamp-ordered
-   * values associated with each key, groups the values by window,
-   * combines windows as needed, and for each window in each key,
-   * outputs a collection of key/value-list pairs implicitly assigned
-   * to the window and with the timestamp derived from that window.
+   * Runner-specific primitive that takes a collection of timestamp-ordered values associated with
+   * each key, groups the values by window, merges windows as needed, and for each window in each
+   * key, outputs a collection of key/value-list pairs implicitly assigned to the window and with
+   * the timestamp derived from that window.
    */
-  private static class GroupAlsoByWindow<K, V>
+  public static class GroupAlsoByWindow<K, V>
       extends PTransform<
           PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
     private final WindowingStrategy<?, ?> windowingStrategy;
@@ -176,8 +178,57 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
       this.windowingStrategy = windowingStrategy;
     }
 
+    public WindowingStrategy<?, ?> getWindowingStrategy() {
+      return windowingStrategy;
+    }
+
+    private KvCoder<K, Iterable<WindowedValue<V>>> getKvCoder(
+        Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+      // Coder<KV<...>> --> KvCoder<...>
+      checkArgument(inputCoder instanceof KvCoder,
+          "%s requires a %s<...> but got %s",
+          getClass().getSimpleName(),
+          KvCoder.class.getSimpleName(),
+          inputCoder);
+      @SuppressWarnings("unchecked")
+      KvCoder<K, Iterable<WindowedValue<V>>> kvCoder =
+          (KvCoder<K, Iterable<WindowedValue<V>>>) inputCoder;
+      return kvCoder;
+    }
+
+    public Coder<K> getKeyCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+      return getKvCoder(inputCoder).getKeyCoder();
+    }
+
+    public Coder<V> getValueCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+      // Coder<Iterable<...>> --> IterableCoder<...>
+      Coder<Iterable<WindowedValue<V>>> iterableWindowedValueCoder =
+          getKvCoder(inputCoder).getValueCoder();
+      checkArgument(iterableWindowedValueCoder instanceof IterableCoder,
+          "%s requires a %s<..., %s> but got a %s",
+          getClass().getSimpleName(),
+          KvCoder.class.getSimpleName(),
+          IterableCoder.class.getSimpleName(),
+          iterableWindowedValueCoder);
+      IterableCoder<WindowedValue<V>> iterableCoder =
+          (IterableCoder<WindowedValue<V>>) iterableWindowedValueCoder;
+
+      // Coder<WindowedValue<...>> --> WindowedValueCoder<...>
+      Coder<WindowedValue<V>> iterableElementCoder = iterableCoder.getElemCoder();
+      checkArgument(iterableElementCoder instanceof WindowedValueCoder,
+          "%s requires a %s<..., %s<%s>> but got a %s",
+          getClass().getSimpleName(),
+          KvCoder.class.getSimpleName(),
+          IterableCoder.class.getSimpleName(),
+          WindowedValueCoder.class.getSimpleName(),
+          iterableElementCoder);
+      WindowedValueCoder<V> windowedValueCoder =
+          (WindowedValueCoder<V>) iterableElementCoder;
+
+      return windowedValueCoder.getValueCoder();
+    }
+
     @Override
-    @SuppressWarnings("unchecked")
     public PCollection<KV<K, Iterable<V>>> apply(
         PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
       @SuppressWarnings("unchecked")
@@ -197,16 +248,20 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
       Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
       Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
 
-      return input
-          .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
+      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+          input.getPipeline(), windowingStrategy, input.isBounded())
           .setCoder(outputKvCoder);
     }
 
     private <W extends BoundedWindow>
         GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
-            WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
+            WindowingStrategy<?, W> strategy,
+            StateInternalsFactory<K> stateInternalsFactory,
+            Coder<V> inputIterableElementValueCoder) {
       return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
-          strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+          strategy,
+          stateInternalsFactory,
+          SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
index fe2a495..43c287e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -32,10 +33,15 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
@@ -61,7 +67,7 @@ public class GroupAlsoByWindowsProperties {
    */
   public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
     <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>
-    forStrategy(WindowingStrategy<?, W> strategy);
+    forStrategy(WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
   }
 
   /**
@@ -77,10 +83,15 @@ public class GroupAlsoByWindowsProperties {
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
 
+    // This key should never actually be used, though it is eagerly passed to the
+    // StateInternalsFactory so must be non-null
+    @SuppressWarnings("unchecked")
+    K fakeKey = (K) "this key should never be used";
+
     DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> result = runGABW(
         gabwFactory,
         windowingStrategy,
-        (K) null, // key should never be used
+        fakeKey,
         Collections.<WindowedValue<InputT>>emptyList());
 
     assertThat(result.peekOutputElements(), hasSize(0));
@@ -599,11 +610,14 @@ public class GroupAlsoByWindowsProperties {
       K key,
       Collection<WindowedValue<InputT>> values) throws Exception {
 
-    TupleTag<KV<K, OutputT>> outputTag = new TupleTag<>();
-    DoFnRunnerBase.ListOutputManager outputManager = new DoFnRunnerBase.ListOutputManager();
+    final StateInternalsFactory<K> stateInternalsCache = new CachingStateInternalsFactory<K>();
 
     DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> tester =
-        DoFnTester.of(gabwFactory.forStrategy(windowingStrategy));
+        DoFnTester.of(gabwFactory.forStrategy(windowingStrategy, stateInternalsCache));
+
+    // Though we use a DoFnTester, the function itself is instantiated directly by the
+    // runner and should not be serialized; it may not even be serializable.
+    tester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     tester.startBundle();
     tester.processElement(KV.<K, Iterable<WindowedValue<InputT>>>of(key, values));
     tester.finishBundle();
@@ -620,4 +634,28 @@ public class GroupAlsoByWindowsProperties {
     return new IntervalWindow(new Instant(start), new Instant(end));
   }
 
+  private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> {
+    private final LoadingCache<K, StateInternals<K>> stateInternalsCache;
+
+    private CachingStateInternalsFactory() {
+      this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public StateInternals<K> stateInternalsForKey(K key) {
+      try {
+        return stateInternalsCache.get(key);
+      } catch (Exception exc) {
+        throw new RuntimeException(exc);
+      }
+    }
+  }
+
+  private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals<K>> {
+    @Override
+    public StateInternals<K> load(K key) throws Exception {
+      return InMemoryStateInternals.forKey(key);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
index 4ac6164..1f02a8f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
@@ -21,6 +21,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,10 +43,13 @@ public class GroupAlsoByWindowsViaOutputBufferDoFnTest {
     }
 
     @Override
-    public <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W>
-        forStrategy(WindowingStrategy<?, W> windowingStrategy) {
+    public <W extends BoundedWindow>
+        GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
+            WindowingStrategy<?, W> windowingStrategy,
+            StateInternalsFactory<K> stateInternalsFactory) {
       return new GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>(
           windowingStrategy,
+          stateInternalsFactory,
           SystemReduceFn.<K, InputT, W>buffering(inputCoder));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 9782ab1..5d3ab3f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
@@ -26,11 +27,14 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.SystemReduceFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -94,9 +98,19 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       WindowingStrategy<?, BoundedWindow> windowingStrategy =
           (WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy();
 
+      DirectStepContext stepContext =
+          evaluationContext
+              .getExecutionContext(application, inputBundle.getKey())
+              .getOrCreateStepContext(
+                  evaluationContext.getStepName(application), application.getTransform().getName());
+
+      StateInternals<K> stateInternals = (StateInternals<K>) stepContext.stateInternals();
+
       DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
           GroupAlsoByWindowViaWindowSetDoFn.create(
               windowingStrategy,
+              // new DirectStateInternalsFactory<K, V>(stepContext),
+              new ConstantStateInternalsFactory<K>(stateInternals),
               SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder));
 
       TupleTag<KV<K, Iterable<V>>> mainOutputTag = new TupleTag<KV<K, Iterable<V>>>() {};
@@ -105,6 +119,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       this.gabwParDoEvaluator =
           ParDoEvaluator.create(
               evaluationContext,
+              stepContext,
               inputBundle,
               application,
               gabwDoFn,
@@ -124,4 +139,19 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       return gabwParDoEvaluator.finishBundle();
     }
   }
+
+  private static final class ConstantStateInternalsFactory<K>
+      implements StateInternalsFactory<K> {
+    private final StateInternals<K> stateInternals;
+
+    private ConstantStateInternalsFactory(StateInternals<K> stateInternals) {
+      this.stateInternals = stateInternals;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public StateInternals<K> stateInternalsForKey(K key) {
+      return stateInternals;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 58cee4d..485cf4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -46,6 +46,7 @@ import java.util.Map;
 class ParDoEvaluator<T> implements TransformEvaluator<T> {
   public static <InputT, OutputT> ParDoEvaluator<InputT> create(
       EvaluationContext evaluationContext,
+      DirectStepContext stepContext,
       CommittedBundle<InputT> inputBundle,
       AppliedPTransform<PCollection<InputT>, ?, ?> application,
       DoFn<InputT, OutputT> fn,
@@ -55,9 +56,6 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
       Map<TupleTag<?>, PCollection<?>> outputs) {
     DirectExecutionContext executionContext =
         evaluationContext.getExecutionContext(application, inputBundle.getKey());
-    String stepName = evaluationContext.getStepName(application);
-    DirectStepContext stepContext =
-        executionContext.getOrCreateStepContext(stepName, stepName);
 
     CounterSet counters = evaluationContext.createCounterSet();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index b87cd3e..eda3db4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -77,10 +78,15 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
     @SuppressWarnings({"unchecked", "rawtypes"})
     ThreadLocal<DoFn<InT, OuT>> fnLocal =
         (ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application);
+    String stepName = evaluationContext.getStepName(application);
+    DirectStepContext stepContext =
+        evaluationContext.getExecutionContext(application, inputBundle.getKey())
+            .getOrCreateStepContext(stepName, stepName);
     try {
       TransformEvaluator<InT> parDoEvaluator =
           ParDoEvaluator.create(
               evaluationContext,
+              stepContext,
               inputBundle,
               application,
               fnLocal.get(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index e9c7dd6..044abdc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -73,6 +74,10 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
       CommittedBundle<InputT> inputBundle,
       EvaluationContext evaluationContext) {
     TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
+    String stepName = evaluationContext.getStepName(application);
+    DirectStepContext stepContext =
+        evaluationContext.getExecutionContext(application, inputBundle.getKey())
+            .getOrCreateStepContext(stepName, stepName);
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     ThreadLocal<DoFn<InputT, OutputT>> fnLocal =
@@ -81,6 +86,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
       ParDoEvaluator<InputT> parDoEvaluator =
           ParDoEvaluator.create(
               evaluationContext,
+              stepContext,
               inputBundle,
               application,
               fnLocal.get(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 3c9c9ee..bce37e4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -160,6 +160,7 @@ public class ParDoEvaluatorTest {
 
     return ParDoEvaluator.create(
         evaluationContext,
+        stepContext,
         inputBundle,
         (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(),
         fn,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 3f845cf..0e977db 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.Serializable;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -47,6 +48,8 @@ import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -262,12 +265,15 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
    */
   private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
     if (this.operator == null) {
+
+      StateInternalsFactory<K> stateInternalsFactory = new GroupAlsoByWindowWrapperStateInternalsFactory();
+
       if (this.combineFn == null) {
         // Thus VOUT == Iterable<VIN>
         Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
 
         this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
-            (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
+            (WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
       } else {
         Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
 
@@ -275,14 +281,14 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
             .withInputCoder(combineFn, coderRegistry, inputKvCoder);
 
         this.operator = GroupAlsoByWindowViaWindowSetDoFn.create(
-            (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
+            (WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
       }
     }
     return this.operator;
   }
 
   private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception {
-    context.setElement(workItem, getStateInternalsForKey(workItem.key()));
+    context.setElement(workItem);
     operator.processElement(context);
   }
 
@@ -438,8 +444,6 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
 
     private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector;
 
-    private FlinkStateInternals<K> stateInternals;
-
     private KeyedWorkItem<K, VIN> element;
 
     public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
@@ -452,10 +456,8 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
       this.timerInternals = checkNotNull(timerInternals);
     }
 
-    public void setElement(KeyedWorkItem<K, VIN> element,
-                           FlinkStateInternals<K> stateForKey) {
+    public void setElement(KeyedWorkItem<K, VIN> element) {
       this.element = element;
-      this.stateInternals = stateForKey;
     }
 
     public void setCurrentInputWatermark(Instant watermark) {
@@ -509,8 +511,8 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
       return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() {
 
         @Override
-        public org.apache.beam.sdk.util.state.StateInternals stateInternals() {
-          return stateInternals;
+        public StateInternals stateInternals() {
+          throw new UnsupportedOperationException("stateInternals() is not available");
         }
 
         @Override
@@ -628,4 +630,13 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
     // restore the timerInternals.
     this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
   }
+
+  private class GroupAlsoByWindowWrapperStateInternalsFactory implements
+      StateInternalsFactory<K>, Serializable {
+
+    @Override
+    public StateInternals<K> stateInternalsForKey(K key) {
+      return getStateInternalsForKey(key);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 34a0ede..c5d5802 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+
 package org.apache.beam.runners.spark.translation;
 
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
@@ -32,6 +33,7 @@ import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.TextIO;
@@ -47,8 +49,16 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AssignWindowsDoFn;
+import org.apache.beam.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.sdk.util.SystemReduceFn;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -78,12 +88,12 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
 import scala.Tuple2;
 
 /**
@@ -161,6 +171,55 @@ public final class TransformTranslator {
     };
   }
 
+  private static <K, V, W extends BoundedWindow>
+      TransformEvaluator<GroupAlsoByWindow<K, V>> gabw() {
+    return new TransformEvaluator<GroupAlsoByWindow<K, V>>() {
+      @Override
+      public void evaluate(GroupAlsoByWindow<K, V> transform, EvaluationContext context) {
+        @SuppressWarnings("unchecked")
+        JavaRDDLike<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>, ?> inRDD =
+            (JavaRDDLike<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>, ?>)
+                context.getInputRDD(transform);
+
+        Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder =
+            context.getInput(transform).getCoder();
+        Coder<K> keyCoder = transform.getKeyCoder(inputCoder);
+        Coder<V> valueCoder = transform.getValueCoder(inputCoder);
+
+        @SuppressWarnings("unchecked")
+        KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+            (KvCoder<K, Iterable<WindowedValue<V>>>) context.getInput(transform).getCoder();
+        Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
+
+        IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+            (IterableCoder<WindowedValue<V>>) inputValueCoder;
+        Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
+        WindowedValueCoder<V> inputIterableWindowedValueCoder =
+            (WindowedValueCoder<V>) inputIterableElementCoder;
+
+        Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+
+        @SuppressWarnings("unchecked")
+        WindowingStrategy<?, W> windowingStrategy =
+            (WindowingStrategy<?, W>) transform.getWindowingStrategy();
+
+        DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
+            new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+                windowingStrategy,
+                new InMemoryStateInternalsFactory<K>(),
+                SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+
+        // GroupAlsoByWindow current uses a dummy in-memory StateInternals
+        JavaRDDLike<WindowedValue<KV<K, Iterable<V>>>, ?> outRDD =
+            inRDD.mapPartitions(
+                new DoFnFunction<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>>(
+                    gabwDoFn, context.getRuntimeContext(), null));
+
+        context.setOutputRDD(transform, outRDD);
+      }
+    };
+  }
+
   private static final FieldGetter GROUPED_FG = new FieldGetter(Combine.GroupedValues.class);
 
   private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
@@ -815,6 +874,7 @@ public final class TransformTranslator {
     EVALUATORS.put(ParDo.Bound.class, parDo());
     EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
     EVALUATORS.put(GroupByKeyOnly.class, gbk());
+    EVALUATORS.put(GroupAlsoByWindow.class, gabw());
     EVALUATORS.put(Combine.GroupedValues.class, grouped());
     EVALUATORS.put(Combine.Globally.class, combineGlobally());
     EVALUATORS.put(Combine.PerKey.class, combinePerKey());
@@ -853,4 +913,12 @@ public final class TransformTranslator {
       return getTransformEvaluator(clazz);
     }
   }
+
+  private static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>,
+      Serializable {
+    @Override
+    public StateInternals<K> stateInternalsForKey(K key) {
+      return InMemoryStateInternals.forKey(key);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index c38f0ab..c8bd5de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -131,6 +131,28 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
+   * Whether or not a {@link DoFnTester} should clone the {@link DoFn} under test.
+   */
+  public enum CloningBehavior {
+    CLONE,
+    DO_NOT_CLONE;
+  }
+
+  /**
+   * Instruct this {@link DoFnTester} whether or not to clone the {@link DoFn} under test.
+   */
+  public void setCloningBehavior(CloningBehavior newValue) {
+    this.cloningBehavior = newValue;
+  }
+
+  /**
+   *  Indicates whether this {@link DoFnTester} will clone the {@link DoFn} under test.
+   */
+  public CloningBehavior getCloningBehavior() {
+    return cloningBehavior;
+  }
+
+  /**
    * A convenience operation that first calls {@link #startBundle},
    * then calls {@link #processElement} on each of the input elements, then
    * calls {@link #finishBundle}, then returns the result of
@@ -644,6 +666,13 @@ public class DoFnTester<InputT, OutputT> {
   /** The original DoFn under test. */
   private final DoFn<InputT, OutputT> origFn;
 
+  /**
+   * Whether to clone the original {@link DoFn} or just use it as-is.
+   *
+   * <p></p>Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be.
+   */
+  private CloningBehavior cloningBehavior = CloningBehavior.CLONE;
+
   /** The side input values to provide to the DoFn under test. */
   private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs =
       new HashMap<>();
@@ -676,10 +705,14 @@ public class DoFnTester<InputT, OutputT> {
 
   @SuppressWarnings("unchecked")
   private void initializeState() {
-    fn = (DoFn<InputT, OutputT>)
-        SerializableUtils.deserializeFromByteArray(
-            SerializableUtils.serializeToByteArray(origFn),
-            origFn.toString());
+    if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) {
+      fn = origFn;
+    } else {
+      fn = (DoFn<InputT, OutputT>)
+          SerializableUtils.deserializeFromByteArray(
+              SerializableUtils.serializeToByteArray(origFn),
+              origFn.toString());
+    }
     outputs = new HashMap<>();
     accumulators = new HashMap<>();
   }



[2/4] incubator-beam git commit: Introduce StateInternalsFactory

Posted by ke...@apache.org.
Introduce StateInternalsFactory

This class vends a StateInternals for a particular key.
Various DoFns that access StateInternals via ProcessContext
will be ported to instead accept a StateInternalsFactory.

The StateInternalsFatory must be provided by the runner
to enable provisioning of the appropriate run-time
StateInternals.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d2594e0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d2594e0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d2594e0f

Branch: refs/heads/master
Commit: d2594e0fc588849dba905e168523deacd4f1f4dd
Parents: 20244ba
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 18 13:56:22 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 25 09:30:32 2016 -0700

----------------------------------------------------------------------
 .../sdk/util/state/StateInternalsFactory.java   | 36 ++++++++++++++++++++
 1 file changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d2594e0f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java
new file mode 100644
index 0000000..54355c7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+import java.io.Serializable;
+
+/**
+ * A factory for providing {@link StateInternals} for a particular key.
+ *
+ * <p>Because it will generally be embedded in a {@link DoFn}, albeit at execution time,
+ * it is marked {@link Serializable}.
+ */
+@Experimental(Kind.STATE)
+public interface StateInternalsFactory<K> {
+
+  /** Returns {@link StateInternals} for the provided key. */
+  StateInternals<K> stateInternalsForKey(K key);
+}


[3/4] incubator-beam git commit: Tidy WriteWithShardingFactory

Posted by ke...@apache.org.
Tidy WriteWithShardingFactory


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20244bad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20244bad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20244bad

Branch: refs/heads/master
Commit: 20244badc3d1b8defd9e5b9a718f54475c502365
Parents: cf14644
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jul 19 19:16:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 25 09:30:32 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/WriteWithShardingFactory.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20244bad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 93f2408..d6ee6ea 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -78,7 +78,8 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
           Window.<T>into(new GlobalWindows()).triggering(DefaultTrigger.of())
               .withAllowedLateness(Duration.ZERO)
               .discardingFiredPanes());
-      final PCollectionView<Long> numRecords = records.apply(Count.<T>globally().asSingletonView());
+      final PCollectionView<Long> numRecords = records
+          .apply("CountRecords", Count.<T>globally().asSingletonView());
       PCollection<T> resharded =
           records
               .apply(
@@ -107,7 +108,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
     private final PCollectionView<Long> numRecords;
     private final int randomExtraShards;
     private int currentShard;
-    private int maxShards;
+    private int maxShards = 0;
 
     KeyBasedOnCountFn(PCollectionView<Long> numRecords, int extraShards) {
       this.numRecords = numRecords;
@@ -116,7 +117,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
 
     @Override
     public void processElement(ProcessContext c) throws Exception {
-      if (maxShards == 0L) {
+      if (maxShards == 0) {
         maxShards = calculateShards(c.sideInput(numRecords));
         currentShard = ThreadLocalRandom.current().nextInt(maxShards);
       }


[4/4] incubator-beam git commit: This closes #697

Posted by ke...@apache.org.
This closes #697


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7809f6bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7809f6bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7809f6bd

Branch: refs/heads/master
Commit: 7809f6bd2cd3605ce389a174aba242ea08860f88
Parents: cf14644 902997d
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 25 12:11:27 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 25 12:11:27 2016 -0700

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 16 ++--
 .../beam/sdk/util/GroupAlsoByWindowsDoFn.java   | 10 ++-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  | 10 +--
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   | 79 +++++++++++++++++---
 .../sdk/util/GroupAlsoByWindowsProperties.java  | 50 +++++++++++--
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  8 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      | 30 ++++++++
 .../beam/runners/direct/ParDoEvaluator.java     |  4 +-
 .../direct/ParDoMultiEvaluatorFactory.java      |  6 ++
 .../direct/ParDoSingleEvaluatorFactory.java     |  6 ++
 .../direct/WriteWithShardingFactory.java        |  7 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |  1 +
 .../FlinkGroupAlsoByWindowWrapper.java          | 31 +++++---
 .../spark/translation/TransformTranslator.java  | 70 ++++++++++++++++-
 .../apache/beam/sdk/transforms/DoFnTester.java  | 41 +++++++++-
 .../sdk/util/state/StateInternalsFactory.java   | 36 +++++++++
 16 files changed, 349 insertions(+), 56 deletions(-)
----------------------------------------------------------------------