You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/18 20:08:29 UTC

[29/50] [abbrv] beam git commit: ApexRunner SDF support

ApexRunner SDF support


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f5efca02
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f5efca02
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f5efca02

Branch: refs/heads/gearpump-runner
Commit: f5efca0292eb9ace2d6b4895bc08e94344854336
Parents: 69522fe
Author: Thomas Weise <th...@apache.org>
Authored: Sat Apr 8 13:58:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 15 17:04:20 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |  34 +++++
 .../translation/ApexPipelineTranslator.java     |  23 ++++
 .../apex/translation/ParDoTranslator.java       |  68 +++++++++-
 .../operators/ApexGroupByKeyOperator.java       |   4 +-
 .../operators/ApexParDoOperator.java            | 124 ++++++++++++++++++-
 .../operators/ApexProcessFnOperator.java        |   8 +-
 .../operators/ApexTimerInternals.java           |  21 +++-
 .../translation/utils/ApexStateInternals.java   |   4 +
 .../translation/utils/StateInternalsProxy.java  |   7 +-
 .../operators/ApexTimerInternalsTest.java       |   8 +-
 10 files changed, 286 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 366308e..2fd0b22 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -31,6 +31,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.apex.api.EmbeddedAppLauncher;
@@ -38,9 +39,11 @@ import org.apache.apex.api.Launcher;
 import org.apache.apex.api.Launcher.AppHandle;
 import org.apache.apex.api.Launcher.LaunchMode;
 import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
+import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PrimitiveCreate;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
@@ -51,18 +54,23 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.AsIterable;
 import org.apache.beam.sdk.transforms.View.AsSingleton;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -112,6 +120,10 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
             PTransformOverride.of(
                 PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
                 new StreamingCombineGloballyAsSingletonView.Factory()))
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.splittableParDoMulti(),
+                new SplittableParDoOverrideFactory<>()))
         .build();
   }
 
@@ -424,4 +436,26 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
   }
 
+  /**
+   * A {@link PTransformOverrideFactory} that overrides a
+   * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with
+   * {@link SplittableParDo}.
+   */
+  static class SplittableParDoOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory<
+      PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> {
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
+        AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
+          transform) {
+      return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
+          new SplittableParDo<>(transform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs,
+        PCollectionTuple newOutput) {
+      return ReplacementOutputs.tagged(outputs, newOutput);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index fdeefc7..b3a6d1c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -23,7 +23,9 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
+import org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator;
 import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.core.construction.PrimitiveCreate;
 import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.sdk.Pipeline;
@@ -35,6 +37,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
@@ -60,6 +63,10 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
   static {
     // register TransformTranslators
     registerTransformTranslator(ParDo.MultiOutput.class, new ParDoTranslator<>());
+    registerTransformTranslator(SplittableParDo.ProcessElements.class,
+        new ParDoTranslator.SplittableProcessElementsTranslator());
+    registerTransformTranslator(SplittableParDo.GBKIntoKeyedWorkItems.class,
+        new GBKIntoKeyedWorkItemsTranslator());
     registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
     registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
     registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
@@ -174,4 +181,20 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
       LOG.debug("view {}", view.getName());
     }
   }
+
+  private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
+    implements TransformTranslator<SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> {
+
+    @Override
+    public void translate(
+        SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform, TranslationContext context) {
+      // https://issues.apache.org/jira/browse/BEAM-1850
+      ApexProcessFnOperator<KV<K, InputT>> operator = ApexProcessFnOperator.toKeyedWorkItems(
+          context.getPipelineOptions());
+      context.addOperator(operator, operator.outputPort);
+      context.addStream(context.getInput(), operator.inputPort);
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index 2e3d902..9133cb6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -30,11 +30,13 @@ import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
@@ -83,7 +85,7 @@ class ParDoTranslator<InputT, OutputT>
     }
 
     Map<TupleTag<?>, PValue> outputs = context.getOutputs();
-    PCollection<InputT> input = (PCollection<InputT>) context.getInput();
+    PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
     WindowedValueCoder<InputT> wvInputCoder =
@@ -130,6 +132,70 @@ class ParDoTranslator<InputT, OutputT>
     }
   }
 
+  static class SplittableProcessElementsTranslator<InputT, OutputT,
+      RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+    implements TransformTranslator<SplittableParDo.ProcessElements<InputT, OutputT,
+      RestrictionT, TrackerT>> {
+
+    @Override
+    public void translate(
+        SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
+        TranslationContext context) {
+
+      Map<TupleTag<?>, PValue> outputs = context.getOutputs();
+      PCollection<InputT> input = context.getInput();
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+      Coder<InputT> inputCoder = input.getCoder();
+      WindowedValueCoder<InputT> wvInputCoder =
+          FullWindowedValueCoder.of(
+              inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      @SuppressWarnings({ "rawtypes", "unchecked" })
+      DoFn<InputT, OutputT> doFn = (DoFn) transform.newProcessFn(transform.getFn());
+      ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
+              context.getPipelineOptions(),
+              doFn,
+              transform.getMainOutputTag(),
+              transform.getAdditionalOutputTags().getAll(),
+              input.getWindowingStrategy(),
+              sideInputs,
+              wvInputCoder,
+              context.getStateBackend());
+
+      Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
+      for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+        checkArgument(
+            output.getValue() instanceof PCollection,
+            "%s %s outputs non-PCollection %s of type %s",
+            ParDo.MultiOutput.class.getSimpleName(),
+            context.getFullName(),
+            output.getValue(),
+            output.getValue().getClass().getSimpleName());
+        PCollection<?> pc = (PCollection<?>) output.getValue();
+        if (output.getKey().equals(transform.getMainOutputTag())) {
+          ports.put(pc, operator.output);
+        } else {
+          int portIndex = 0;
+          for (TupleTag<?> tag : transform.getAdditionalOutputTags().getAll()) {
+            if (tag.equals(output.getKey())) {
+              ports.put(pc, operator.additionalOutputPorts[portIndex]);
+              break;
+            }
+            portIndex++;
+          }
+        }
+      }
+
+      context.addOperator(operator, ports);
+      context.addStream(context.getInput(), operator.input);
+      if (!sideInputs.isEmpty()) {
+        addSideInputs(operator.sideInput1, sideInputs, context);
+      }
+
+    }
+  }
+
+
   static void addSideInputs(
       Operator.InputPort<?> sideInputPort,
       List<PCollectionView<?>> sideInputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 85836ad..1d48e20 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -202,7 +202,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator,
         windowedValue.getTimestamp(),
         windowedValue.getWindows(),
         windowedValue.getPane());
-    timerInternals.setContext(kv.getKey(), this.keyCoder, this.inputWatermark);
+    timerInternals.setContext(kv.getKey(), this.keyCoder, this.inputWatermark, null);
     ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
         newReduceFnRunner(kv.getKey());
     reduceFnRunner.processElements(Collections.singletonList(updatedWindowedValue));
@@ -211,7 +211,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator,
 
   @Override
   public void fireTimer(K key, Collection<TimerData> timerData) {
-    timerInternals.setContext(key, keyCoder, inputWatermark);
+    timerInternals.setContext(key, keyCoder, inputWatermark, null);
     ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key);
     try {
       reduceFnRunner.onTimers(timerData);

http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 8c516b1..7fee0d5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.apex.translation.operators;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
@@ -28,8 +31,10 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
@@ -44,20 +49,33 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
+import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -65,6 +83,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,7 +91,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Apex operator for Beam {@link DoFn}.
  */
-public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager {
+public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager,
+    ApexTimerInternals.TimerProcessor<Object> {
   private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class);
   private boolean traceTuples = true;
 
@@ -139,6 +159,14 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
     TimerInternals.TimerDataCoder timerCoder =
         TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
     this.currentKeyTimerInternals = new ApexTimerInternals<>(timerCoder);
+
+    if (doFn instanceof SplittableParDo.ProcessFn) {
+      // we know that it is keyed on String
+      Coder<?> keyCoder = StringUtf8Coder.of();
+      this.currentKeyStateInternals = new StateInternalsProxy<>(
+          stateBackend.newStateInternalsFactory(keyCoder));
+    }
+
   }
 
   @SuppressWarnings("unused") // for Kryo
@@ -272,7 +300,10 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
           keyCoder = kwiCoder.getKeyCoder();
         }
         ((StateInternalsProxy) currentKeyStateInternals).setKey(key);
-        currentKeyTimerInternals.setContext(key, keyCoder, new Instant(this.currentInputWatermark));
+        currentKeyTimerInternals.setContext(key, keyCoder,
+            new Instant(this.currentInputWatermark),
+            new Instant(this.currentOutputWatermark)
+            );
       }
       Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
           .processElementInReadyWindows(elem);
@@ -286,9 +317,47 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
     }
   }
 
+  @Override
+  public void fireTimer(Object key, Collection<TimerData> timerDataSet) {
+    pushbackDoFnRunner.startBundle();
+    @SuppressWarnings("unchecked")
+    Coder<Object> keyCoder = (Coder) currentKeyStateInternals.getKeyCoder();
+    ((StateInternalsProxy) currentKeyStateInternals).setKey(key);
+    currentKeyTimerInternals.setContext(key, keyCoder, new Instant(this.currentInputWatermark),
+        new Instant(this.currentOutputWatermark));
+    for (TimerData timerData : timerDataSet) {
+      StateNamespace namespace = timerData.getNamespace();
+      checkArgument(namespace instanceof WindowNamespace);
+      BoundedWindow window = ((WindowNamespace<?>) namespace).getWindow();
+      pushbackDoFnRunner.onTimer(timerData.getTimerId(), window,
+          timerData.getTimestamp(), timerData.getDomain());
+    }
+    pushbackDoFnRunner.finishBundle();
+  }
+
   private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
     this.currentInputWatermark = mark.getTimestamp();
-
+    long minEventTimeTimer = currentKeyTimerInternals.fireReadyTimers(
+        this.currentInputWatermark,
+        this, TimeDomain.EVENT_TIME);
+
+    checkState(minEventTimeTimer >= currentInputWatermark,
+        "Event time timer processing generates new timer(s) behind watermark.");
+    //LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer,
+    //    currentInputWatermark);
+
+    // TODO: is this the right way to trigger processing time timers?
+    // drain all timers below current watermark, including those that result from firing
+    long minProcessingTimeTimer = Long.MIN_VALUE;
+    while (minProcessingTimeTimer < currentInputWatermark) {
+      minProcessingTimeTimer = currentKeyTimerInternals.fireReadyTimers(
+        this.currentInputWatermark,
+        this, TimeDomain.PROCESSING_TIME);
+      if (minProcessingTimeTimer < currentInputWatermark) {
+        LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer,
+            currentInputWatermark);
+      }
+    }
     if (sideInputs.isEmpty()) {
       if (traceTuples) {
         LOG.debug("\nemitting watermark {}\n", mark);
@@ -376,6 +445,52 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
     pushbackDoFnRunner =
         SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
 
+    if (doFn instanceof SplittableParDo.ProcessFn) {
+
+      @SuppressWarnings("unchecked")
+      StateInternalsFactory<String> stateInternalsFactory =
+          (StateInternalsFactory<String>) this.currentKeyStateInternals.getFactory();
+
+      @SuppressWarnings({ "rawtypes", "unchecked" })
+      SplittableParDo.ProcessFn<InputT, OutputT, Object, RestrictionTracker<Object>>
+        splittableDoFn = (SplittableParDo.ProcessFn) doFn;
+      splittableDoFn.setStateInternalsFactory(stateInternalsFactory);
+      TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
+         @Override
+         public TimerInternals timerInternalsForKey(String key) {
+           return currentKeyTimerInternals;
+          }
+        };
+      splittableDoFn.setTimerInternalsFactory(timerInternalsFactory);
+      splittableDoFn.setProcessElementInvoker(
+          new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
+              doFn,
+              pipelineOptions.get(),
+              new OutputWindowedValue<OutputT>() {
+                @Override
+                public void outputWindowedValue(
+                    OutputT output,
+                    Instant timestamp,
+                    Collection<? extends BoundedWindow> windows,
+                    PaneInfo pane) {
+                  output(
+                      mainOutputTag,
+                      WindowedValue.of(output, timestamp, windows, pane));
+                }
+
+                @Override
+                public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tag,
+                    AdditionalOutputT output, Instant timestamp,
+                    Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+                  output(tag, WindowedValue.of(output, timestamp, windows, pane));
+                }
+              },
+              sideInputReader,
+              Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
+              10000,
+              Duration.standardSeconds(10)));
+    }
+
   }
 
   @Override
@@ -390,6 +505,9 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
 
   @Override
   public void endWindow() {
+    currentKeyTimerInternals.fireReadyTimers(
+        currentKeyTimerInternals.currentProcessingTime().getMillis(),
+        this, TimeDomain.PROCESSING_TIME);
   }
 
   private static class LongMin {

http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
index 835c9e0..9f4e6c5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
@@ -91,7 +91,13 @@ public class ApexProcessFnOperator<InputT> extends BaseOperator {
   /**
    * Convert {@link KV} into {@link KeyedWorkItem}s.
    */
-  public static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> {
+  public static <K, V> ApexProcessFnOperator<KV<K, V>> toKeyedWorkItems(
+      ApexPipelineOptions options) {
+    ApexOperatorFn<KV<K, V>> fn = new ToKeyedWorkItems<>();
+    return new ApexProcessFnOperator<KV<K, V>>(fn, options.isTupleTracingEnabled());
+  }
+
+  private static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> {
     @Override
     public final void process(ApexStreamTuple<WindowedValue<KV<K, V>>> tuple,
         OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
index 1eb224c..0a0dd50 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
@@ -55,6 +55,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
 
   private transient K currentKey;
   private transient Instant currentInputWatermark;
+  private transient Instant currentOutputWatermark;
   private transient Coder<K> keyCoder;
 
   public ApexTimerInternals(TimerDataCoder timerDataCoder) {
@@ -62,10 +63,12 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
     this.processingTimeTimers = new TimerSet(timerDataCoder);
   }
 
-  public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark) {
+  public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark,
+      Instant outputWatermark) {
     this.currentKey = key;
     this.keyCoder = keyCoder;
     this.currentInputWatermark = inputWatermark;
+    this.currentOutputWatermark = outputWatermark;
   }
 
   @VisibleForTesting
@@ -118,7 +121,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
 
   @Override
   public Instant currentOutputWatermarkTime() {
-    return null;
+    return currentOutputWatermark;
   }
 
   public interface TimerProcessor<K> {
@@ -128,11 +131,19 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
   /**
    * Fire the timers that are ready. These are the timers
    * that are registered to be triggered at a time before the current time.
+   * Timer processing may register new timers, which can cause the returned
+   * timestamp to be before the the current time. The caller may repeat
+   * the call until such backdated timers are cleared.
+   * @return minimum timestamp of registered timers.
    */
-  public void fireReadyTimers(long currentTime,
+  public long fireReadyTimers(long currentTime,
       TimerProcessor<K> timerProcessor, TimeDomain timeDomain) {
     TimerSet timers = getTimerSet(timeDomain);
 
+    // move minTimestamp first,
+    // timer additions that result from firing may modify it
+    timers.minTimestamp = currentTime;
+
     // we keep the timers to return in a different list and launch them later
     // because we cannot prevent a trigger from registering another timer,
     // which would lead to concurrent modification exception.
@@ -173,6 +184,8 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
         }
       }
     }
+
+    return timers.minTimestamp;
   }
 
   private Slice getKeyBytes(K key) {
@@ -186,6 +199,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
   protected static class TimerSet implements Serializable {
     private final Map<Slice, Set<Slice>> activeTimers = new HashMap<>();
     private final TimerDataCoder timerDataCoder;
+    private long minTimestamp = Long.MAX_VALUE;
 
     protected TimerSet(TimerDataCoder timerDataCoder) {
       this.timerDataCoder = timerDataCoder;
@@ -205,6 +219,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
       }
 
       activeTimers.put(keyBytes, timersForKey);
+      this.minTimestamp = Math.min(minTimestamp, timer.getTimestamp().getMillis());
     }
 
     public void deleteTimer(Slice keyBytes, StateNamespace namespace, String timerId) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index eeea6d1..18ea8e4 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -438,6 +438,10 @@ public class ApexStateInternals<K> implements StateInternals {
       this.keyCoder = keyCoder;
     }
 
+    public Coder<K> getKeyCoder() {
+      return this.keyCoder;
+    }
+
     @Override
     public ApexStateInternals<K> stateInternalsForKey(K key) {
       final Slice keyBytes;

http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
index ccf7e43..b652c68 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
@@ -24,6 +24,7 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateContext;
 
@@ -34,7 +35,7 @@ import org.apache.beam.sdk.state.StateContext;
 @DefaultSerializer(JavaSerializer.class)
 public class StateInternalsProxy<K> implements StateInternals, Serializable {
 
-  private final StateInternalsFactory<K> factory;
+  private final ApexStateInternals.ApexStateInternalsFactory<K> factory;
   private transient K currentKey;
 
   public StateInternalsProxy(ApexStateInternals.ApexStateInternalsFactory<K> factory) {
@@ -45,6 +46,10 @@ public class StateInternalsProxy<K> implements StateInternals, Serializable {
     return this.factory;
   }
 
+  public Coder<K> getKeyCoder() {
+    return factory.getKeyCoder();
+  }
+
   public void setKey(K key) {
     currentKey = key;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
index 7b52223..ba1c801 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
@@ -60,7 +60,7 @@ public class ApexTimerInternalsTest {
     Instant instant2 = new Instant(2);
 
     ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
-    timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now());
+    timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now(), null);
 
     TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(),
         instant0, TimeDomain.EVENT_TIME);
@@ -98,7 +98,7 @@ public class ApexTimerInternalsTest {
     Instant instant1 = new Instant(1);
 
     ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
-    timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now());
+    timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now(), null);
 
     TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(),
         instant0, TimeDomain.EVENT_TIME);
@@ -133,11 +133,11 @@ public class ApexTimerInternalsTest {
         new Instant(0), TimeDomain.EVENT_TIME);
     String key = "key";
     ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
-    timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now());
+    timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now(), null);
     timerInternals.setTimer(timerData);
     ApexTimerInternals<String> cloned;
     assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(timerInternals));
-    cloned.setContext(key, StringUtf8Coder.of(), Instant.now());
+    cloned.setContext(key, StringUtf8Coder.of(), Instant.now(), null);
     Map<?, Set<Slice>> timers = cloned.getTimerSet(TimeDomain.EVENT_TIME).getMap();
     assertEquals(1, timers.size());
   }