You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/08 01:26:31 UTC

[1/2] beam git commit: BEAM-1887 Switch Apex ParDo to new DoFn.

Repository: beam
Updated Branches:
  refs/heads/master 1594849da -> 7f55746ce


BEAM-1887 Switch Apex ParDo to new DoFn.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8484ef91
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8484ef91
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8484ef91

Branch: refs/heads/master
Commit: 8484ef9168940cf929648fd53982acc6740c5107
Parents: 1594849
Author: Thomas Weise <th...@apache.org>
Authored: Tue Apr 4 00:33:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 18:25:55 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |   2 -
 .../apex/translation/GroupByKeyTranslator.java  |   4 +-
 .../apex/translation/ParDoTranslator.java       |  15 +-
 .../apex/translation/TranslationContext.java    |  16 +-
 .../translation/WindowAssignTranslator.java     |  58 ++----
 .../operators/ApexGroupByKeyOperator.java       |  21 +--
 .../operators/ApexParDoOperator.java            | 187 +++++++++++++++----
 .../operators/ApexProcessFnOperator.java        | 184 ++++++++++++++++++
 .../translation/utils/ApexStateInternals.java   |  73 ++++++--
 .../translation/utils/StateInternalsProxy.java  |  67 +++++++
 .../translation/ApexGroupByKeyOperatorTest.java |   2 +-
 .../apex/translation/ParDoTranslatorTest.java   |   2 +-
 .../utils/ApexStateInternalsTest.java           |  25 ++-
 .../streaming/SingletonKeyedWorkItemCoder.java  |   2 +-
 14 files changed, 512 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index d23fc14..1c99f8d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -67,8 +67,6 @@ import org.apache.hadoop.conf.Configuration;
  * A {@link PipelineRunner} that translates the
  * pipeline to an Apex DAG and executes it on an Apex cluster.
  *
- * <p>Currently execution is always in embedded mode,
- * launch on Hadoop cluster will be added in subsequent iteration.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class ApexRunner extends PipelineRunner<ApexRunnerResult> {

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
index b46e3eb..2e0bae7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
@@ -31,9 +31,9 @@ class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>
 
   @Override
   public void translate(GroupByKey<K, V> transform, TranslationContext context) {
-    PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) context.getInput();
+    PCollection<KV<K, V>> input = context.getInput();
     ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
-        input, context.<K>stateInternalsFactory()
+        input, context.getStateBackend()
         );
     context.addOperator(group, group.output);
     context.addStream(input, group.input);

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index 75722c7..fa9d21d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -82,23 +82,22 @@ class ParDoTranslator<InputT, OutputT>
     }
 
     List<TaggedPValue> outputs = context.getOutputs();
-    PCollection<InputT> input = (PCollection<InputT>) context.getInput();
+    PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
     WindowedValueCoder<InputT> wvInputCoder =
         FullWindowedValueCoder.of(
             inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
 
-    ApexParDoOperator<InputT, OutputT> operator =
-        new ApexParDoOperator<>(
+    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
             context.getPipelineOptions(),
             doFn,
             transform.getMainOutputTag(),
             transform.getSideOutputTags().getAll(),
-            ((PCollection<InputT>) context.getInput()).getWindowingStrategy(),
+            input.getWindowingStrategy(),
             sideInputs,
             wvInputCoder,
-            context.<Void>stateInternalsFactory());
+            context.getStateBackend());
 
     Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
     for (TaggedPValue output : outputs) {
@@ -126,15 +125,15 @@ class ParDoTranslator<InputT, OutputT>
     context.addOperator(operator, ports);
     context.addStream(context.getInput(), operator.input);
     if (!sideInputs.isEmpty()) {
-      addSideInputs(operator, sideInputs, context);
+      addSideInputs(operator.sideInput1, sideInputs, context);
     }
   }
 
   static void addSideInputs(
-      ApexParDoOperator<?, ?> operator,
+      Operator.InputPort<?> sideInputPort,
       List<PCollectionView<?>> sideInputs,
       TranslationContext context) {
-    Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
+    Operator.InputPort<?>[] sideInputPorts = {sideInputPort};
     if (sideInputs.size() > sideInputPorts.length) {
       PCollection<?> unionCollection = unionSideInputs(sideInputs, context);
       context.addStream(unionCollection, sideInputPorts[0]);

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index fc49fc7..81507ef 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -31,9 +31,9 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
-import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -89,16 +89,16 @@ class TranslationContext {
     return getCurrentTransform().getInputs();
   }
 
-  public PValue getInput() {
-    return Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue();
+  public <InputT extends PValue> InputT getInput() {
+    return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue();
   }
 
   public List<TaggedPValue> getOutputs() {
     return getCurrentTransform().getOutputs();
   }
 
-  public PValue getOutput() {
-    return Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue();
+  public <OutputT extends PValue> OutputT getOutput() {
+    return (OutputT) Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue();
   }
 
   private AppliedPTransform<?, ?, ?> getCurrentTransform() {
@@ -192,10 +192,10 @@ class TranslationContext {
   }
 
   /**
-   * Return the {@link StateInternalsFactory} for the pipeline translation.
+   * Return the state backend for the pipeline translation.
    * @return
    */
-  public <K> StateInternalsFactory<K> stateInternalsFactory() {
-    return new ApexStateInternals.ApexStateInternalsFactory();
+  public ApexStateBackend getStateBackend() {
+    return new ApexStateInternals.ApexStateBackend();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
index 6106f75..f34f9ee 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
@@ -18,61 +18,35 @@
 
 package org.apache.beam.runners.apex.translation;
 
-import java.util.Collections;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
-import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.DoFnAdapters;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
 
 /**
- * {@link Window} is translated to {link ApexParDoOperator} that wraps an {@link
- * AssignWindowsDoFn}.
+ * {@link Window} is translated to {@link ApexProcessFnOperator#assignWindows}.
  */
 class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
   private static final long serialVersionUID = 1L;
 
   @Override
   public void translate(Window.Assign<T> transform, TranslationContext context) {
-    PCollection<T> output = (PCollection<T>) context.getOutput();
-    PCollection<T> input = (PCollection<T>) context.getInput();
-    @SuppressWarnings("unchecked")
-    WindowingStrategy<T, BoundedWindow> windowingStrategy =
-        (WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();
+    PCollection<T> output = context.getOutput();
+    PCollection<T> input = context.getInput();
 
-    OldDoFn<T, T> fn =
-        (transform.getWindowFn() == null)
-            ? DoFnAdapters.toOldDoFn(new IdentityFn<T>())
-            : new AssignWindowsDoFn<>(transform.getWindowFn());
+   if (transform.getWindowFn() == null) {
+     // no work to do
+     context.addAlias(output, input);
+   } else {
+      @SuppressWarnings("unchecked")
+      WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) transform.getWindowFn();
+      ApexProcessFnOperator<T> operator = ApexProcessFnOperator.assignWindows(windowFn,
+          context.getPipelineOptions());
+      context.addOperator(operator, operator.outputPort);
+      context.addStream(context.getInput(), operator.inputPort);
+   }
 
-    ApexParDoOperator<T, T> operator =
-        new ApexParDoOperator<T, T>(
-            context.getPipelineOptions().as(ApexPipelineOptions.class),
-            fn,
-            new TupleTag<T>(),
-            TupleTagList.empty().getAll(),
-            windowingStrategy,
-            Collections.<PCollectionView<?>>emptyList(),
-            WindowedValue.getFullCoder(
-                input.getCoder(), windowingStrategy.getWindowFn().windowCoder()),
-            context.<Void>stateInternalsFactory());
-    context.addOperator(operator, operator.output);
-    context.addStream(context.getInput(), operator.input);
   }
 
-  private static class IdentityFn<T> extends DoFn<T, T> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 3508c3e..4551c9c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -39,6 +39,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
@@ -95,7 +96,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   private final SerializablePipelineOptions serializedOptions;
   @Bind(JavaSerializer.class)
   private final StateInternalsFactory<K> stateInternalsFactory;
-  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
   private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
 
   private transient ProcessContext context;
@@ -135,13 +135,13 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
 
   @SuppressWarnings("unchecked")
   public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input,
-      StateInternalsFactory<K> stateInternalsFactory) {
+      ApexStateBackend stateBackend) {
     checkNotNull(pipelineOptions);
     this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
     this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy();
     this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
     this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
-    this.stateInternalsFactory = stateInternalsFactory;
+    this.stateInternalsFactory = stateBackend.newStateInternalsFactory(keyCoder);
   }
 
   @SuppressWarnings("unused") // for Kryo
@@ -222,18 +222,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   }
 
   private StateInternals<K> getStateInternalsForKey(K key) {
-    final ByteBuffer keyBytes;
-    try {
-      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
-    } catch (CoderException e) {
-      throw new RuntimeException(e);
-    }
-    StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes);
-    if (stateInternals == null) {
-      stateInternals = stateInternalsFactory.stateInternalsForKey(key);
-      perKeyStateInternals.put(keyBytes, stateInternals);
-    }
-    return stateInternals;
+    return stateInternalsFactory.stateInternalsForKey(key);
   }
 
   private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
@@ -423,7 +412,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
    * An implementation of Beam's {@link TimerInternals}.
    *
    */
-  public class ApexTimerInternals implements TimerInternals {
+  private class ApexTimerInternals implements TimerInternals {
 
     @Deprecated
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 7f2512a..1fc91c8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -25,41 +25,52 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
 import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.StateInternalsProxy;
 import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
 import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +84,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
   @Bind(JavaSerializer.class)
   private final SerializablePipelineOptions pipelineOptions;
   @Bind(JavaSerializer.class)
-  private final OldDoFn<InputT, OutputT> doFn;
+  private final DoFn<InputT, OutputT> doFn;
   @Bind(JavaSerializer.class)
   private final TupleTag<OutputT> mainOutputTag;
   @Bind(JavaSerializer.class)
@@ -83,6 +94,12 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
   @Bind(JavaSerializer.class)
   private final List<PCollectionView<?>> sideInputs;
 
+  private StateInternalsProxy<?> currentKeyStateInternals;
+  // TODO: if the operator gets restored to checkpointed state due to a failure,
+  // the timer state is lost.
+  private final transient CurrentKeyTimerInternals<Object> currentKeyTimerInternals =
+      new CurrentKeyTimerInternals<>();
+
   private final StateInternals<Void> sideInputStateInternals;
   private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
   private LongMin pushedBackWatermark = new LongMin();
@@ -93,17 +110,17 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
   private transient SideInputHandler sideInputHandler;
   private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping =
       Maps.newHashMapWithExpectedSize(5);
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
 
-  @Deprecated
   public ApexParDoOperator(
       ApexPipelineOptions pipelineOptions,
-      OldDoFn<InputT, OutputT> doFn,
+      DoFn<InputT, OutputT> doFn,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       WindowingStrategy<?, ?> windowingStrategy,
       List<PCollectionView<?>> sideInputs,
       Coder<WindowedValue<InputT>> inputCoder,
-      StateInternalsFactory<Void> stateInternalsFactory
+      ApexStateBackend stateBackend
       ) {
     this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
     this.doFn = doFn;
@@ -111,7 +128,8 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
     this.sideOutputTags = sideOutputTags;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
-    this.sideInputStateInternals = stateInternalsFactory.stateInternalsForKey(null);
+    this.sideInputStateInternals = new StateInternalsProxy<>(
+        stateBackend.newStateInternalsFactory(VoidCoder.of()));
 
     if (sideOutputTags.size() > sideOutputPorts.length) {
       String msg = String.format("Too many side outputs (currently only supporting %s).",
@@ -125,27 +143,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
 
   }
 
-  public ApexParDoOperator(
-      ApexPipelineOptions pipelineOptions,
-      DoFn<InputT, OutputT> doFn,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      WindowingStrategy<?, ?> windowingStrategy,
-      List<PCollectionView<?>> sideInputs,
-      Coder<WindowedValue<InputT>> inputCoder,
-      StateInternalsFactory<Void> stateInternalsFactory
-      ) {
-    this(
-        pipelineOptions,
-        DoFnAdapters.toOldDoFn(doFn),
-        mainOutputTag,
-        sideOutputTags,
-        windowingStrategy,
-        sideInputs,
-        inputCoder,
-        stateInternalsFactory);
-  }
-
   @SuppressWarnings("unused") // for Kryo
   private ApexParDoOperator() {
     this.pipelineOptions = null;
@@ -255,6 +252,17 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
   private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
     try {
       pushbackDoFnRunner.startBundle();
+      if (currentKeyStateInternals != null) {
+        InputT value = elem.getValue();
+        Object key;
+        if (value instanceof KeyedWorkItem) {
+          key = ((KeyedWorkItem) value).key();
+        } else {
+          key = ((KV) value).getKey();
+        }
+        ((StateInternalsProxy) currentKeyStateInternals).setKey(key);
+        currentKeyTimerInternals.currentKey = key;
+      }
       Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
           .processElementInReadyWindows(elem);
       pushbackDoFnRunner.finishBundle();
@@ -305,6 +313,19 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
       sideOutputPortMapping.put(sideOutputTags.get(i), port);
     }
 
+    NoOpStepContext stepContext = new NoOpStepContext() {
+
+      @Override
+      public StateInternals<?> stateInternals() {
+        return currentKeyStateInternals;
+      }
+
+      @Override
+      public TimerInternals timerInternals() {
+        return currentKeyTimerInternals;
+      }
+
+    };
     DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
         pipelineOptions.get(),
         doFn,
@@ -312,21 +333,47 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
         this,
         mainOutputTag,
         sideOutputTags,
-        new NoOpStepContext(),
+        stepContext,
         new NoOpAggregatorFactory(),
         windowingStrategy
         );
 
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
+
+    if (this.currentKeyStateInternals != null) {
+
+      StatefulDoFnRunner.CleanupTimer cleanupTimer =
+          new StatefulDoFnRunner.TimeInternalsCleanupTimer(
+              stepContext.timerInternals(), windowingStrategy);
+
+      @SuppressWarnings({"rawtypes"})
+      Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+      @SuppressWarnings({"unchecked"})
+      StatefulDoFnRunner.StateCleaner<?> stateCleaner =
+          new StatefulDoFnRunner.StateInternalsStateCleaner<>(
+              doFn, stepContext.stateInternals(), windowCoder);
+
+      doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
+          doFn,
+          doFnRunner,
+          stepContext,
+          new NoOpAggregatorFactory(),
+          windowingStrategy,
+          cleanupTimer,
+          stateCleaner);
+    }
+
     pushbackDoFnRunner =
         PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
 
-    try {
-      doFn.setup();
-    } catch (Exception e) {
-      Throwables.propagateIfPossible(e);
-      throw new RuntimeException(e);
-    }
+  }
 
+  @Override
+  public void teardown() {
+    doFnInvoker.invokeTeardown();
+    super.teardown();
   }
 
   @Override
@@ -393,4 +440,70 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
 
   }
 
+  private class CurrentKeyTimerInternals<K> implements TimerInternals {
+
+    private TimerInternalsFactory<K> factory = new TimerInternalsFactory<K>() {
+      @Override
+      public TimerInternals timerInternalsForKey(K key) {
+        InMemoryTimerInternals timerInternals = perKeyTimerInternals.get(key);
+        if (timerInternals == null) {
+          perKeyTimerInternals.put(key, timerInternals = new InMemoryTimerInternals());
+        }
+        return timerInternals;
+      }
+    };
+
+    // TODO: durable state store
+    final Map<K, InMemoryTimerInternals> perKeyTimerInternals = new HashMap<>();
+    private K currentKey;
+
+    @Override
+    public void setTimer(StateNamespace namespace, String timerId, Instant target,
+        TimeDomain timeDomain) {
+      factory.timerInternalsForKey(currentKey).setTimer(
+          namespace, timerId, target, timeDomain);
+    }
+
+    @Override
+    public void setTimer(TimerData timerData) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteTimer(TimerData timerKey) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Instant currentSynchronizedProcessingTime() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return new Instant(currentInputWatermark);
+    }
+
+    @Override
+    public Instant currentOutputWatermarkTime() {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
new file mode 100644
index 0000000..835c9e0
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex operator for simple native map operations.
+ */
+public class ApexProcessFnOperator<InputT> extends BaseOperator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ApexProcessFnOperator.class);
+  private boolean traceTuples = false;
+  @Bind(JavaSerializer.class)
+  private final ApexOperatorFn<InputT> fn;
+
+  public ApexProcessFnOperator(ApexOperatorFn<InputT> fn, boolean traceTuples) {
+    super();
+    this.traceTuples = traceTuples;
+    this.fn = fn;
+  }
+
+  @SuppressWarnings("unused")
+  private ApexProcessFnOperator() {
+    // for Kryo
+    fn = null;
+  }
+
+  private final transient OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter =
+      new OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>>() {
+    @Override
+    public void emit(ApexStreamTuple<? extends WindowedValue<?>> tuple) {
+      if (traceTuples) {
+        LOG.debug("\nemitting {}\n", tuple);
+      }
+      outputPort.emit(tuple);
+    }
+  };
+
+  /**
+   * Something that emits results.
+   */
+  public interface OutputEmitter<T> {
+    void emit(T tuple);
+  };
+
+  /**
+   * The processing logic for this operator.
+   */
+  public interface ApexOperatorFn<InputT> extends Serializable {
+    void process(ApexStreamTuple<WindowedValue<InputT>> input,
+        OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) throws Exception;
+  }
+
+  /**
+   * Convert {@link KV} into {@link KeyedWorkItem}s.
+   */
+  public static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> {
+    @Override
+    public final void process(ApexStreamTuple<WindowedValue<KV<K, V>>> tuple,
+        OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) {
+
+      if (tuple instanceof ApexStreamTuple.WatermarkTuple) {
+        outputEmitter.emit(tuple);
+      } else {
+        for (WindowedValue<KV<K, V>> in : tuple.getValue().explodeWindows()) {
+          KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(in.getValue().getKey(),
+              Collections.singletonList(in.withValue(in.getValue().getValue())));
+          outputEmitter.emit(ApexStreamTuple.DataTuple.of(in.withValue(kwi)));
+        }
+      }
+    }
+  }
+
+  public static <T, W extends BoundedWindow> ApexProcessFnOperator<T> assignWindows(
+      WindowFn<T, W> windowFn, ApexPipelineOptions options) {
+    ApexOperatorFn<T> fn = new AssignWindows<T, W>(windowFn);
+    return new ApexProcessFnOperator<T>(fn, options.isTupleTracingEnabled());
+  }
+
+  /**
+   * Function for implementing {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
+   */
+  private static class AssignWindows<T, W extends BoundedWindow> implements ApexOperatorFn<T> {
+    private final WindowFn<T, W> windowFn;
+
+    private AssignWindows(WindowFn<T, W> windowFn) {
+      this.windowFn = windowFn;
+    }
+
+    @Override
+    public final void process(ApexStreamTuple<WindowedValue<T>> tuple,
+        OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) throws Exception {
+      if (tuple instanceof ApexStreamTuple.WatermarkTuple) {
+        outputEmitter.emit(tuple);
+      } else {
+        final WindowedValue<T> input = tuple.getValue();
+        Collection<W> windows =
+            (windowFn).assignWindows(
+                (windowFn).new AssignContext() {
+                    @Override
+                    public T element() {
+                      return input.getValue();
+                    }
+
+                    @Override
+                    public Instant timestamp() {
+                      return input.getTimestamp();
+                    }
+
+                    @Override
+                    public BoundedWindow window() {
+                      return Iterables.getOnlyElement(input.getWindows());
+                    }
+                  });
+        for (W w: windows) {
+          WindowedValue<T> wv = WindowedValue.of(input.getValue(), input.getTimestamp(),
+              w, input.getPane());
+          outputEmitter.emit(ApexStreamTuple.DataTuple.of(wv));
+        }
+      }
+    }
+  }
+
+  /**
+   * Input port.
+   */
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> inputPort =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
+      try {
+        fn.process(tuple, outputEmitter);
+      } catch (Exception e) {
+        Throwables.throwIfUnchecked(e);
+        throw new RuntimeException(e);
+      }
+    }
+  };
+
+  /**
+   * Output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<? extends WindowedValue<?>>>
+    outputPort = new DefaultOutputPort<>();
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index c59afc5..cfc57cd 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.apex.translation.utils;
 
+import com.datatorrent.netlet.util.Slice;
 import com.esotericsoftware.kryo.DefaultSerializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
@@ -27,7 +28,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.StateNamespace;
@@ -35,6 +38,7 @@ import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTag.StateBinder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -42,6 +46,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
@@ -56,22 +61,18 @@ import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.joda.time.Instant;
 
 /**
- * Implementation of {@link StateInternals} that can be serialized and
- * checkpointed with the operator. Suitable for small states, in the future this
- * should be based on the incremental state saving components in the Apex
- * library.
+ * Implementation of {@link StateInternals} for transient use.
+ *
+ * <p>For fields that need to be serialized, use {@link ApexStateInternalsFactory}
+ * or {@link StateInternalsProxy}
  */
-@DefaultSerializer(JavaSerializer.class)
-public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
-  private static final long serialVersionUID = 1L;
-  public static <K> ApexStateInternals<K> forKey(K key) {
-    return new ApexStateInternals<>(key);
-  }
-
+public class ApexStateInternals<K> implements StateInternals<K> {
   private final K key;
+  private final Table<String, String, byte[]> stateTable;
 
-  protected ApexStateInternals(K key) {
+  protected ApexStateInternals(K key, Table<String, String, byte[]> stateTable) {
     this.key = key;
+    this.stateTable = stateTable;
   }
 
   @Override
@@ -79,11 +80,6 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
     return key;
   }
 
-  /**
-   * Serializable state for internals (namespace to state tag to coded value).
-   */
-  private final Table<String, String, byte[]> stateTable = HashBasedTable.create();
-
   @Override
   public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
     return state(namespace, address, StateContexts.nullContext());
@@ -437,17 +433,54 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
   }
 
   /**
-   * Factory for {@link ApexStateInternals}.
+   * Implementation of {@link StateInternals} that can be serialized and
+   * checkpointed with the operator. Suitable for small states, in the future this
+   * should be based on the incremental state saving components in the Apex
+   * library.
    *
    * @param <K> key type
    */
+  @DefaultSerializer(JavaSerializer.class)
   public static class ApexStateInternalsFactory<K>
       implements StateInternalsFactory<K>, Serializable {
     private static final long serialVersionUID = 1L;
+    /**
+     * Serializable state for internals (namespace to state tag to coded value).
+     */
+    private Map<Slice, Table<String, String, byte[]>> perKeyState = new HashMap<>();
+    private final Coder<K> keyCoder;
+
+    private ApexStateInternalsFactory(Coder<K> keyCoder) {
+      this.keyCoder = keyCoder;
+    }
 
     @Override
-    public StateInternals<K> stateInternalsForKey(K key) {
-      return ApexStateInternals.forKey(key);
+    public ApexStateInternals<K> stateInternalsForKey(K key) {
+      final Slice keyBytes;
+      try {
+        keyBytes = (key != null) ? new Slice(CoderUtils.encodeToByteArray(keyCoder, key)) :
+          new Slice(null);
+      } catch (CoderException e) {
+        throw new RuntimeException(e);
+      }
+      Table<String, String, byte[]> stateTable = perKeyState.get(keyBytes);
+      if (stateTable == null) {
+        stateTable = HashBasedTable.create();
+        perKeyState.put(keyBytes, stateTable);
+      }
+      return new ApexStateInternals<>(key, stateTable);
+    }
+
+  }
+
+  /**
+   * Factory to create the state internals.
+   */
+  public static class ApexStateBackend implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public <K> ApexStateInternalsFactory<K> newStateInternalsFactory(Coder<K> keyCoder) {
+      return new ApexStateInternalsFactory<K>(keyCoder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
new file mode 100644
index 0000000..1f28364
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import java.io.Serializable;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+
+/**
+ * State internals for reusable processing context.
+ * @param <K>
+ */
+@DefaultSerializer(JavaSerializer.class)
+public class StateInternalsProxy<K> implements StateInternals<K>, Serializable {
+
+  private final StateInternalsFactory<K> factory;
+  private transient K currentKey;
+
+  public StateInternalsProxy(ApexStateInternals.ApexStateInternalsFactory<K> factory) {
+    this.factory = factory;
+  }
+
+  public StateInternalsFactory<K> getFactory() {
+    return this.factory;
+  }
+
+  public void setKey(K key) {
+    currentKey = key;
+  }
+
+  @Override
+  public K getKey() {
+    return currentKey;
+  }
+
+  @Override
+  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+    return factory.stateInternalsForKey(currentKey).state(namespace, address);
+  }
+
+  @Override
+  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address,
+      StateContext<?> c) {
+    return factory.stateInternalsForKey(currentKey).state(namespace, address, c);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
index fb80d0c..4b73114 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
@@ -66,7 +66,7 @@ public class ApexGroupByKeyOperatorTest {
     input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
 
     ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options,
-        input, new ApexStateInternals.ApexStateInternalsFactory<String>()
+        input, new ApexStateInternals.ApexStateBackend()
         );
 
     operator.setup(null);

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
index 3bcba00..2760d06 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
@@ -216,7 +216,7 @@ public class ParDoTranslatorTest {
             WindowingStrategy.globalDefault(),
             Collections.<PCollectionView<?>>singletonList(singletonView),
             coder,
-            new ApexStateInternals.ApexStateInternalsFactory<Void>());
+            new ApexStateInternals.ApexStateBackend());
     operator.setup(null);
     operator.beginWindow(0);
     WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 4f4ecfb..7160e45 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertThat;
 
 import com.datatorrent.lib.util.KryoCloneUtils;
 import java.util.Arrays;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
 import org.apache.beam.runners.core.StateMerging;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaceForTest;
@@ -76,7 +78,9 @@ public class ApexStateInternalsTest {
 
   @Before
   public void initStateInternals() {
-    underTest = new ApexStateInternals<>(null);
+    underTest = new ApexStateInternals.ApexStateBackend()
+        .newStateInternalsFactory(StringUtf8Coder.of())
+        .stateInternalsForKey((String) null);
   }
 
   @Test
@@ -344,16 +348,21 @@ public class ApexStateInternalsTest {
 
   @Test
   public void testSerialization() throws Exception {
-    ApexStateInternals<String> original = new ApexStateInternals<String>(null);
-    ValueState<String> value = original.state(NAMESPACE_1, STRING_VALUE_ADDR);
-    assertEquals(original.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+    ApexStateInternalsFactory<String> sif = new ApexStateBackend().
+        newStateInternalsFactory(StringUtf8Coder.of());
+    ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
+
+    ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
     value.write("hello");
 
-    ApexStateInternals<String> cloned;
-    assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(original));
-    ValueState<String> clonedValue = cloned.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    ApexStateInternalsFactory<String> cloned;
+    assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
+    ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
+
+    ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
     assertThat(clonedValue.read(), Matchers.equalTo("hello"));
-    assertEquals(cloned.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+    assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index d95ed7c..fe96eb1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**
- * Singleton keyed word iteam coder.
+ * Singleton keyed work item coder.
  * @param <K>
  * @param <ElemT>
  */


[2/2] beam git commit: This closes #2450

Posted by jk...@apache.org.
This closes #2450


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7f55746c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7f55746c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7f55746c

Branch: refs/heads/master
Commit: 7f55746ce3eb2a0b6ee2f6372d6f7127835cbb34
Parents: 1594849 8484ef9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 18:26:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 18:26:15 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |   2 -
 .../apex/translation/GroupByKeyTranslator.java  |   4 +-
 .../apex/translation/ParDoTranslator.java       |  15 +-
 .../apex/translation/TranslationContext.java    |  16 +-
 .../translation/WindowAssignTranslator.java     |  58 ++----
 .../operators/ApexGroupByKeyOperator.java       |  21 +--
 .../operators/ApexParDoOperator.java            | 187 +++++++++++++++----
 .../operators/ApexProcessFnOperator.java        | 184 ++++++++++++++++++
 .../translation/utils/ApexStateInternals.java   |  73 ++++++--
 .../translation/utils/StateInternalsProxy.java  |  67 +++++++
 .../translation/ApexGroupByKeyOperatorTest.java |   2 +-
 .../apex/translation/ParDoTranslatorTest.java   |   2 +-
 .../utils/ApexStateInternalsTest.java           |  25 ++-
 .../streaming/SingletonKeyedWorkItemCoder.java  |   2 +-
 14 files changed, 512 insertions(+), 146 deletions(-)
----------------------------------------------------------------------