You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/08 03:04:38 UTC

[5/6] incubator-beam git commit: BEAM-261 Make translators package private.

BEAM-261 Make translators package private.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5553c603
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5553c603
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5553c603

Branch: refs/heads/apex-runner
Commit: 5553c603a0c48855d38d4702f19e905eac2034f2
Parents: 9197d1e
Author: Thomas Weise <th...@apache.org>
Authored: Thu Oct 27 16:19:15 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Nov 7 22:33:46 2016 +0100

----------------------------------------------------------------------
 .../runners/apex/ApexPipelineTranslator.java    | 185 -------
 .../apache/beam/runners/apex/ApexRunner.java    |  13 +-
 .../beam/runners/apex/ApexRunnerResult.java     |   2 +-
 .../translation/ApexPipelineTranslator.java     | 179 +++++++
 .../translation/CreateValuesTranslator.java     |  48 ++
 .../FlattenPCollectionTranslator.java           | 129 +++++
 .../apex/translation/GroupByKeyTranslator.java  |  42 ++
 .../translation/ParDoBoundMultiTranslator.java  | 142 ++++++
 .../apex/translation/ParDoBoundTranslator.java  |  64 +++
 .../translation/ReadUnboundedTranslator.java    |  42 ++
 .../apex/translation/TransformTranslator.java   |  31 ++
 .../apex/translation/TranslationContext.java    | 178 +++++++
 .../operators/ApexFlattenOperator.java          | 125 +++++
 .../operators/ApexGroupByKeyOperator.java       | 478 +++++++++++++++++++
 .../operators/ApexParDoOperator.java            | 375 +++++++++++++++
 .../ApexReadUnboundedInputOperator.java         | 155 ++++++
 .../translation/operators/package-info.java     |  22 +
 .../runners/apex/translation/package-info.java  |  22 +
 .../translation/utils/ApexStateInternals.java   | 438 +++++++++++++++++
 .../apex/translation/utils/ApexStreamTuple.java | 222 +++++++++
 .../utils/CoderAdapterStreamCodec.java          |  69 +++
 .../apex/translation/utils/NoOpStepContext.java |  72 +++
 .../utils/SerializablePipelineOptions.java      |  60 +++
 .../utils/ValueAndCoderKryoSerializable.java    |  77 +++
 .../apex/translation/utils/ValuesSource.java    | 149 ++++++
 .../apex/translation/utils/package-info.java    |  22 +
 .../translators/CreateValuesTranslator.java     |  48 --
 .../FlattenPCollectionTranslator.java           | 129 -----
 .../apex/translators/GroupByKeyTranslator.java  |  42 --
 .../translators/ParDoBoundMultiTranslator.java  | 142 ------
 .../apex/translators/ParDoBoundTranslator.java  |  64 ---
 .../translators/ReadUnboundedTranslator.java    |  42 --
 .../apex/translators/TransformTranslator.java   |  31 --
 .../apex/translators/TranslationContext.java    | 178 -------
 .../functions/ApexFlattenOperator.java          | 125 -----
 .../functions/ApexGroupByKeyOperator.java       | 478 -------------------
 .../functions/ApexParDoOperator.java            | 375 ---------------
 .../translators/functions/package-info.java     |  22 -
 .../io/ApexReadUnboundedInputOperator.java      | 154 ------
 .../apex/translators/io/ValuesSource.java       | 149 ------
 .../apex/translators/io/package-info.java       |  22 -
 .../runners/apex/translators/package-info.java  |  22 -
 .../translators/utils/ApexStateInternals.java   | 438 -----------------
 .../apex/translators/utils/ApexStreamTuple.java | 222 ---------
 .../utils/CoderAdapterStreamCodec.java          |  69 ---
 .../apex/translators/utils/NoOpStepContext.java |  72 ---
 .../utils/SerializablePipelineOptions.java      |  60 ---
 .../utils/ValueAndCoderKryoSerializable.java    |  77 ---
 .../apex/translators/utils/package-info.java    |  22 -
 .../translation/ApexGroupByKeyOperatorTest.java | 112 +++++
 .../FlattenPCollectionTranslatorTest.java       |  99 ++++
 .../translation/GroupByKeyTranslatorTest.java   | 246 ++++++++++
 .../translation/ParDoBoundTranslatorTest.java   | 340 +++++++++++++
 .../translation/ReadUnboundTranslatorTest.java  | 129 +++++
 .../utils/ApexStateInternalsTest.java           | 361 ++++++++++++++
 .../translation/utils/CollectionSource.java     | 136 ++++++
 .../translation/utils/PipelineOptionsTest.java  |  84 ++++
 .../translators/ApexGroupByKeyOperatorTest.java | 112 -----
 .../FlattenPCollectionTranslatorTest.java       |  99 ----
 .../translators/GroupByKeyTranslatorTest.java   | 246 ----------
 .../translators/ParDoBoundTranslatorTest.java   | 340 -------------
 .../translators/ReadUnboundTranslatorTest.java  | 129 -----
 .../utils/ApexStateInternalsTest.java           | 361 --------------
 .../translators/utils/CollectionSource.java     | 136 ------
 .../translators/utils/PipelineOptionsTest.java  |  84 ----
 65 files changed, 4653 insertions(+), 4685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
deleted file mode 100644
index 8a87ce0..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex;
-
-import com.datatorrent.api.DAG;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
-import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
-import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
-import org.apache.beam.runners.apex.translators.GroupByKeyTranslator;
-import org.apache.beam.runners.apex.translators.ParDoBoundMultiTranslator;
-import org.apache.beam.runners.apex.translators.ParDoBoundTranslator;
-import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator;
-import org.apache.beam.runners.apex.translators.TransformTranslator;
-import org.apache.beam.runners.apex.translators.TranslationContext;
-import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link ApexPipelineTranslator} translates {@link Pipeline} objects
- * into Apex logical plan {@link DAG}.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
-  private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class);
-
-  /**
-   * A map from {@link PTransform} subclass to the corresponding
-   * {@link TransformTranslator} to use to translate that transform.
-   */
-  private static final Map<Class<? extends PTransform>, TransformTranslator>
-      transformTranslators = new HashMap<>();
-
-  private final TranslationContext translationContext;
-
-  static {
-    // register TransformTranslators
-    registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
-    registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator<>());
-    registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
-    registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
-    registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
-    registerTransformTranslator(Flatten.FlattenPCollectionList.class,
-        new FlattenPCollectionTranslator());
-    registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
-    registerTransformTranslator(CreateApexPCollectionView.class,
-        new CreateApexPCollectionViewTranslator());
-    registerTransformTranslator(CreatePCollectionView.class,
-        new CreatePCollectionViewTranslator());
-  }
-
-  public ApexPipelineTranslator(TranslationContext translationContext) {
-    this.translationContext = translationContext;
-  }
-
-  public void translate(Pipeline pipeline) {
-    pipeline.traverseTopologically(this);
-  }
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
-    LOG.debug("entering composite transform {}", node.getTransform());
-    return CompositeBehavior.ENTER_TRANSFORM;
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
-    LOG.debug("leaving composite transform {}", node.getTransform());
-  }
-
-  @Override
-  public void visitPrimitiveTransform(TransformTreeNode node) {
-    LOG.debug("visiting transform {}", node.getTransform());
-    PTransform transform = node.getTransform();
-    TransformTranslator translator = getTransformTranslator(transform.getClass());
-    if (null == translator) {
-      throw new UnsupportedOperationException(
-          "no translator registered for " + transform);
-    }
-    translationContext.setCurrentTransform(node);
-    translator.translate(transform, translationContext);
-  }
-
-  @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
-    LOG.debug("visiting value {}", value);
-  }
-
-  /**
-   * Records that instances of the specified PTransform class
-   * should be translated by default by the corresponding
-   * {@link TransformTranslator}.
-   */
-  private static <TransformT extends PTransform> void registerTransformTranslator(
-      Class<TransformT> transformClass,
-      TransformTranslator<? extends TransformT> transformTranslator) {
-    if (transformTranslators.put(transformClass, transformTranslator) != null) {
-      throw new IllegalArgumentException(
-          "defining multiple translators for " + transformClass);
-    }
-  }
-
-  /**
-   * Returns the {@link TransformTranslator} to use for instances of the
-   * specified PTransform class, or null if none registered.
-   */
-  private <TransformT extends PTransform<?, ?>>
-  TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
-    return transformTranslators.get(transformClass);
-  }
-
-  private static class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void translate(Read.Bounded<T> transform, TranslationContext context) {
-      // TODO: adapter is visibleForTesting
-      BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(
-          transform.getSource());
-      ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
-          unboundedSource, true, context.getPipelineOptions());
-      context.addOperator(operator, operator.output);
-    }
-
-  }
-
-  private static class CreateApexPCollectionViewTranslator<ElemT, ViewT>
-      implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void translate(CreateApexPCollectionView<ElemT, ViewT> transform,
-        TranslationContext context) {
-      PCollectionView<ViewT> view = transform.getView();
-      context.addView(view);
-      LOG.debug("view {}", view.getName());
-    }
-  }
-
-  private static class CreatePCollectionViewTranslator<ElemT, ViewT>
-      implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void translate(CreatePCollectionView<ElemT, ViewT> transform,
-        TranslationContext context) {
-      PCollectionView<ViewT> view = transform.getView();
-      context.addView(view);
-      LOG.debug("view {}", view.getName());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 661308d..b42dddf 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -29,7 +29,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.beam.runners.apex.translators.TranslationContext;
+import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
 import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
@@ -118,17 +118,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   }
 
   @Override
-  public ApexRunnerResult run(Pipeline pipeline) {
+  public ApexRunnerResult run(final Pipeline pipeline) {
 
-    final TranslationContext translationContext = new TranslationContext(options);
-    ApexPipelineTranslator translator = new ApexPipelineTranslator(translationContext);
-    translator.translate(pipeline);
+    final ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
 
     StreamingApplication apexApp = new StreamingApplication() {
       @Override
       public void populateDAG(DAG dag, Configuration conf) {
         dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName());
-        translationContext.populateDAG(dag);
+        translator.translate(pipeline, dag);
       }
     };
 
@@ -352,9 +350,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
   /**
    * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-   *
-   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
-   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
    * They require the input {@link PCollection} fits in memory.
    * For a large {@link PCollection} this is expected to crash!
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 03428a6..3ae69f2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -79,7 +79,7 @@ public class ApexRunnerResult implements PipelineResult {
 
   /**
    * Return the DAG executed by the pipeline.
-   * @return
+   * @return DAG from translation.
    */
   public DAG getApexDAG() {
     return apexDAG;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
new file mode 100644
index 0000000..d38faf7
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import com.datatorrent.api.DAG;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
+import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ApexPipelineTranslator} translates {@link Pipeline} objects
+ * into Apex logical plan {@link DAG}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
+  private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class);
+
+  /**
+   * A map from {@link PTransform} subclass to the corresponding
+   * {@link TransformTranslator} to use to translate that transform.
+   */
+  private static final Map<Class<? extends PTransform>, TransformTranslator>
+      transformTranslators = new HashMap<>();
+
+  private final TranslationContext translationContext;
+
+  static {
+    // register TransformTranslators
+    registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+    registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator<>());
+    registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
+    registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
+    registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
+    registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+        new FlattenPCollectionTranslator());
+    registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+    registerTransformTranslator(CreateApexPCollectionView.class,
+        new CreateApexPCollectionViewTranslator());
+    registerTransformTranslator(CreatePCollectionView.class,
+        new CreatePCollectionViewTranslator());
+  }
+
+  public ApexPipelineTranslator(ApexPipelineOptions options) {
+    this.translationContext = new TranslationContext(options);
+  }
+
+  public void translate(Pipeline pipeline, DAG dag) {
+    pipeline.traverseTopologically(this);
+    translationContext.populateDAG(dag);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+    LOG.debug("entering composite transform {}", node.getTransform());
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformTreeNode node) {
+    LOG.debug("leaving composite transform {}", node.getTransform());
+  }
+
+  @Override
+  public void visitPrimitiveTransform(TransformTreeNode node) {
+    LOG.debug("visiting transform {}", node.getTransform());
+    PTransform transform = node.getTransform();
+    TransformTranslator translator = getTransformTranslator(transform.getClass());
+    if (null == translator) {
+      throw new UnsupportedOperationException(
+          "no translator registered for " + transform);
+    }
+    translationContext.setCurrentTransform(node);
+    translator.translate(transform, translationContext);
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformTreeNode producer) {
+    LOG.debug("visiting value {}", value);
+  }
+
+  /**
+   * Records that instances of the specified PTransform class
+   * should be translated by default by the corresponding
+   * {@link TransformTranslator}.
+   */
+  private static <TransformT extends PTransform> void registerTransformTranslator(
+      Class<TransformT> transformClass,
+      TransformTranslator<? extends TransformT> transformTranslator) {
+    if (transformTranslators.put(transformClass, transformTranslator) != null) {
+      throw new IllegalArgumentException(
+          "defining multiple translators for " + transformClass);
+    }
+  }
+
+  /**
+   * Returns the {@link TransformTranslator} to use for instances of the
+   * specified PTransform class, or null if none registered.
+   */
+  private <TransformT extends PTransform<?, ?>>
+  TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
+    return transformTranslators.get(transformClass);
+  }
+
+  private static class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void translate(Read.Bounded<T> transform, TranslationContext context) {
+      // TODO: adapter is visibleForTesting
+      BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(
+          transform.getSource());
+      ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+          unboundedSource, true, context.getPipelineOptions());
+      context.addOperator(operator, operator.output);
+    }
+
+  }
+
+  private static class CreateApexPCollectionViewTranslator<ElemT, ViewT>
+      implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void translate(CreateApexPCollectionView<ElemT, ViewT> transform,
+        TranslationContext context) {
+      PCollectionView<ViewT> view = transform.getView();
+      context.addView(view);
+      LOG.debug("view {}", view.getName());
+    }
+  }
+
+  private static class CreatePCollectionViewTranslator<ElemT, ViewT>
+      implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void translate(CreatePCollectionView<ElemT, ViewT> transform,
+        TranslationContext context) {
+      PCollectionView<ViewT> view = transform.getView();
+      context.addView(view);
+      LOG.debug("view {}", view.getName());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
new file mode 100644
index 0000000..ceae2b5
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translation.utils.ValuesSource;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+
+
+/**
+ * Wraps elements from Create.Values into an {@link UnboundedSource}.
+ * mainly used for testing
+ */
+class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
+  private static final long serialVersionUID = 1451000241832745629L;
+
+  @Override
+  public void translate(Create.Values<T> transform, TranslationContext context) {
+    try {
+      UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
+          transform.getDefaultOutputCoder((PBegin) context.getInput()));
+      ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+          unboundedSource, context.getPipelineOptions());
+      context.addOperator(operator, operator.output);
+    } catch (CannotProvideCoderException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
new file mode 100644
index 0000000..eb24af9
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.translation.operators.ApexFlattenOperator;
+import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translation.utils.ValuesSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+/**
+ * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
+ */
+class FlattenPCollectionTranslator<T> implements
+    TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+    PCollectionList<T> input = context.getInput();
+    List<PCollection<T>> collections = input.getAll();
+
+    if (collections.isEmpty()) {
+      // create a dummy source that never emits anything
+      @SuppressWarnings("unchecked")
+      UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST,
+          (Coder<T>) VoidCoder.of());
+      ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+          unboundedSource, context.getPipelineOptions());
+      context.addOperator(operator, operator.output);
+    } else {
+      PCollection<T> output = context.getOutput();
+      Map<PCollection<?>, Integer> unionTags = Collections.emptyMap();
+      flattenCollections(collections, unionTags, output, context);
+    }
+  }
+
+  /**
+   * Flatten the given collections into the given result collection. Translates
+   * into a cascading merge with 2 input ports per operator. The optional union
+   * tags can be used to identify the source in the result stream, used to
+   * channel multiple side inputs to a single Apex operator port.
+   *
+   * @param collections
+   * @param unionTags
+   * @param finalCollection
+   * @param context
+   */
+  static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>,
+      Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) {
+    List<PCollection<T>> remainingCollections = Lists.newArrayList();
+    PCollection<T> firstCollection = null;
+    while (!collections.isEmpty()) {
+      for (PCollection<T> collection : collections) {
+        if (null == firstCollection) {
+          firstCollection = collection;
+        } else {
+          ApexFlattenOperator<T> operator = new ApexFlattenOperator<>();
+          context.addStream(firstCollection, operator.data1);
+          Integer unionTag = unionTags.get(firstCollection);
+          operator.data1Tag = (unionTag != null) ? unionTag : 0;
+          context.addStream(collection, operator.data2);
+          unionTag = unionTags.get(collection);
+          operator.data2Tag = (unionTag != null) ? unionTag : 0;
+
+          if (!collection.getCoder().equals(firstCollection.getCoder())) {
+              throw new UnsupportedOperationException("coders don't match");
+          }
+
+          if (collections.size() > 2) {
+            PCollection<T> intermediateCollection = intermediateCollection(collection,
+                collection.getCoder());
+            context.addOperator(operator, operator.out, intermediateCollection);
+            remainingCollections.add(intermediateCollection);
+          } else {
+            // final stream merge
+            context.addOperator(operator, operator.out, finalCollection);
+          }
+          firstCollection = null;
+        }
+      }
+      if (firstCollection != null) {
+        // push to next merge level
+        remainingCollections.add(firstCollection);
+        firstCollection = null;
+      }
+      if (remainingCollections.size() > 1) {
+        collections = remainingCollections;
+        remainingCollections = Lists.newArrayList();
+      } else {
+        collections = Lists.newArrayList();
+      }
+    }
+  }
+
+  static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
+    PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+        input.getWindowingStrategy(), input.isBounded());
+    output.setCoder(outputCoder);
+    return output;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..47d447a
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link GroupByKey} translation to Apex operator.
+ */
+class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(GroupByKey<K, V> transform, TranslationContext context) {
+    PCollection<KV<K, V>> input = context.getInput();
+    ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
+        input, context.<K>stateInternalsFactory()
+        );
+    context.addOperator(group, group.output);
+    context.addStream(input, group.input);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
new file mode 100644
index 0000000..7c91b91
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.OutputPort;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}.
+ */
+class ParDoBoundMultiTranslator<InputT, OutputT>
+    implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslator.class);
+
+  @Override
+  public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
+    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    PCollectionTuple output = context.getOutput();
+    PCollection<InputT> input = context.getInput();
+    List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+    Coder<InputT> inputCoder = input.getCoder();
+    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+
+    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
+        context.getPipelineOptions(),
+        doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
+        context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder,
+        context.<Void>stateInternalsFactory()
+        );
+
+    Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
+    Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
+    for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
+      if (outputEntry.getKey() == transform.getMainOutputTag()) {
+        ports.put(outputEntry.getValue(), operator.output);
+      } else {
+        int portIndex = 0;
+        for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
+          if (tag == outputEntry.getKey()) {
+            ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]);
+            break;
+          }
+          portIndex++;
+        }
+      }
+    }
+    context.addOperator(operator, ports);
+    context.addStream(context.getInput(), operator.input);
+    if (!sideInputs.isEmpty()) {
+      addSideInputs(operator, sideInputs, context);
+    }
+  }
+
+  static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
+      TranslationContext context) {
+    Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
+    if (sideInputs.size() > sideInputPorts.length) {
+      PCollection<?> unionCollection = unionSideInputs(sideInputs, context);
+      context.addStream(unionCollection, sideInputPorts[0]);
+    } else {
+      // the number of ports for side inputs is fixed and each port can only take one input.
+      for (int i = 0; i < sideInputs.size(); i++) {
+        context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]);
+      }
+    }
+  }
+
+  private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
+      TranslationContext context) {
+    checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
+    // flatten and assign union tag
+    List<PCollection<Object>> sourceCollections = new ArrayList<>();
+    Map<PCollection<?>, Integer> unionTags = new HashMap<>();
+    PCollection<Object> firstSideInput = context.getViewInput(sideInputs.get(0));
+    for (int i = 0; i < sideInputs.size(); i++) {
+      PCollectionView<?> sideInput = sideInputs.get(i);
+      PCollection<?> sideInputCollection = context.getViewInput(sideInput);
+      if (!sideInputCollection.getWindowingStrategy().equals(
+          firstSideInput.getWindowingStrategy())) {
+        // TODO: check how to handle this in stream codec
+        //String msg = "Multiple side inputs with different window strategies.";
+        //throw new UnsupportedOperationException(msg);
+        LOG.warn("Side inputs union with different windowing strategies {} {}",
+            firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
+      }
+      if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
+        String msg = "Multiple side inputs with different coders.";
+        throw new UnsupportedOperationException(msg);
+      }
+      sourceCollections.add(context.<PCollection<Object>>getViewInput(sideInput));
+      unionTags.put(sideInputCollection, i);
+    }
+
+    PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
+        firstSideInput, firstSideInput.getCoder());
+    FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
+        context);
+    return resultCollection;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
new file mode 100644
index 0000000..c1ebbd5
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import java.util.List;
+
+import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/**
+ * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
+ */
+class ParDoBoundTranslator<InputT, OutputT> implements
+    TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
+    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    PCollection<OutputT> output = context.getOutput();
+    PCollection<InputT> input = context.getInput();
+    List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+    Coder<InputT> inputCoder = input.getCoder();
+    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+
+    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
+        context.getPipelineOptions(),
+        doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
+        output.getWindowingStrategy(), sideInputs, wvInputCoder,
+        context.<Void>stateInternalsFactory()
+        );
+    context.addOperator(operator, operator.output);
+    context.addStream(context.getInput(), operator.input);
+    if (!sideInputs.isEmpty()) {
+       ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java
new file mode 100644
index 0000000..b3034ac
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import com.datatorrent.api.InputOperator;
+
+import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+/**
+ * {@link Read.Unbounded} is translated to Apex {@link InputOperator}
+ * that wraps {@link UnboundedSource}.
+ */
+class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(Read.Unbounded<T> transform, TranslationContext context) {
+    UnboundedSource<T, ?> unboundedSource = transform.getSource();
+    ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+        unboundedSource, context.getPipelineOptions());
+    context.addOperator(operator, operator.output);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java
new file mode 100644
index 0000000..eb81052
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+
+import java.io.Serializable;
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Translates {@link PTransform} to Apex functions.
+ */
+interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable {
+  void translate(T transform, TranslationContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
new file mode 100644
index 0000000..e016730
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Operator.OutputPort;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Maintains context data for {@link TransformTranslator}s.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+class TranslationContext {
+
+  private final ApexPipelineOptions pipelineOptions;
+  private AppliedPTransform<?, ?, ?> currentTransform;
+  private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>();
+  private final Map<String, Operator> operators = new HashMap<>();
+  private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>();
+
+  public void addView(PCollectionView<?> view) {
+    this.viewInputs.put(view, this.getInput());
+  }
+
+  public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) {
+    PInput input = this.viewInputs.get(view);
+    checkArgument(input != null, "unknown view " + view.getName());
+    return (InputT) input;
+  }
+
+  TranslationContext(ApexPipelineOptions pipelineOptions) {
+    this.pipelineOptions = pipelineOptions;
+  }
+
+  public void setCurrentTransform(TransformTreeNode treeNode) {
+    this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
+        treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
+  }
+
+  public ApexPipelineOptions getPipelineOptions() {
+    return pipelineOptions;
+  }
+
+  public <InputT extends PInput> InputT getInput() {
+    return (InputT) getCurrentTransform().getInput();
+  }
+
+  public <OutputT extends POutput> OutputT getOutput() {
+    return (OutputT) getCurrentTransform().getOutput();
+  }
+
+  private AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    checkArgument(currentTransform != null, "current transform not set");
+    return currentTransform;
+  }
+
+  public void addOperator(Operator operator, OutputPort port) {
+    addOperator(operator, port, this.<PCollection<?>>getOutput());
+  }
+
+  /**
+   * Register operator and output ports for the given collections.
+   * @param operator
+   * @param ports
+   */
+  public void addOperator(Operator operator, Map<PCollection<?>, OutputPort<?>> ports) {
+    boolean first = true;
+    for (Map.Entry<PCollection<?>, OutputPort<?>> portEntry : ports.entrySet()) {
+      if (first) {
+        addOperator(operator, portEntry.getValue(), portEntry.getKey());
+        first = false;
+      } else {
+        this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(),
+            new ArrayList<>()));
+      }
+    }
+  }
+
+  /**
+   * Add the operator with its output port for the given result {link PCollection}.
+   * @param operator
+   * @param port
+   * @param output
+   */
+  public void addOperator(Operator operator, OutputPort port, PCollection output) {
+    // Apex DAG requires a unique operator name
+    // use the transform's name and make it unique
+    String name = getCurrentTransform().getFullName();
+    for (int i = 1; this.operators.containsKey(name); i++) {
+      name = getCurrentTransform().getFullName() + i;
+    }
+    this.operators.put(name, operator);
+    this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>()));
+  }
+
+  public void addStream(PInput input, InputPort inputPort) {
+    Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input);
+    checkArgument(stream != null, "no upstream operator defined for %s", input);
+    stream.getRight().add(inputPort);
+  }
+
+  public void populateDAG(DAG dag) {
+    for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) {
+      dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue());
+    }
+    int streamIndex = 0;
+    for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.
+        streams.entrySet()) {
+      List<InputPort<?>> sinksList = streamEntry.getValue().getRight();
+      InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]);
+      if (sinks.length > 0) {
+        dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks);
+        for (InputPort port : sinks) {
+          PCollection pc = streamEntry.getKey();
+          Coder coder = pc.getCoder();
+          if (pc.getWindowingStrategy() != null) {
+            coder = FullWindowedValueCoder.of(pc.getCoder(),
+                pc.getWindowingStrategy().getWindowFn().windowCoder()
+                );
+          }
+          Coder<Object> wrapperCoder = ApexStreamTuple.ApexStreamTupleCoder.of(coder);
+          CoderAdapterStreamCodec streamCodec = new CoderAdapterStreamCodec(wrapperCoder);
+          dag.setInputPortAttribute(port, PortContext.STREAM_CODEC, streamCodec);
+        }
+      }
+    }
+  }
+
+  /**
+   * Return the {@link StateInternalsFactory} for the pipeline translation.
+   * @return
+   */
+  public <K> StateInternalsFactory<K> stateInternalsFactory() {
+    return new ApexStateInternals.ApexStateInternalsFactory();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
new file mode 100644
index 0000000..3d9db51
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.WatermarkTuple;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex operator for Beam {@link org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList}.
+ */
+public class ApexFlattenOperator<InputT> extends BaseOperator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class);
+  private boolean traceTuples = false;
+
+  private long inputWM1;
+  private long inputWM2;
+  private long outputWM;
+
+  public int data1Tag;
+  public int data2Tag;
+
+  /**
+   * Data input port 1.
+   */
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
+    /**
+     * Emits to port "out"
+     */
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
+      if (tuple instanceof WatermarkTuple) {
+        WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
+        if (wmTuple.getTimestamp() > inputWM1) {
+          inputWM1 = wmTuple.getTimestamp();
+          if (inputWM1 <= inputWM2) {
+            // move output watermark and emit it
+            outputWM = inputWM1;
+            if (traceTuples) {
+              LOG.debug("\nemitting watermark {}\n", outputWM);
+            }
+            out.emit(tuple);
+          }
+        }
+        return;
+      }
+      if (traceTuples) {
+        LOG.debug("\nemitting {}\n", tuple);
+      }
+
+      if (data1Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
+        ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data1Tag);
+      }
+      out.emit(tuple);
+    }
+  };
+
+  /**
+   * Data input port 2.
+   */
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
+    /**
+     * Emits to port "out"
+     */
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
+      if (tuple instanceof WatermarkTuple) {
+        WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
+        if (wmTuple.getTimestamp() > inputWM2) {
+          inputWM2 = wmTuple.getTimestamp();
+          if (inputWM2 <= inputWM1) {
+            // move output watermark and emit it
+            outputWM = inputWM2;
+            if (traceTuples) {
+              LOG.debug("\nemitting watermark {}\n", outputWM);
+            }
+            out.emit(tuple);
+          }
+        }
+        return;
+      }
+      if (traceTuples) {
+        LOG.debug("\nemitting {}\n", tuple);
+      }
+
+      if (data2Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
+        ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data2Tag);
+      }
+      out.emit(tuple);
+    }
+  };
+
+  /**
+   * Output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out =
+    new DefaultOutputPort<>();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
new file mode 100644
index 0000000..1b5e693
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex operator for Beam {@link GroupByKey}.
+ * This operator expects the input stream already partitioned by K,
+ * which is determined by the {@link StreamCodec} on the input port.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class ApexGroupByKeyOperator<K, V> implements Operator {
+  private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
+  private boolean traceTuples = true;
+
+  @Bind(JavaSerializer.class)
+  private WindowingStrategy<V, BoundedWindow> windowingStrategy;
+  @Bind(JavaSerializer.class)
+  private Coder<K> keyCoder;
+  @Bind(JavaSerializer.class)
+  private Coder<V> valueCoder;
+
+  @Bind(JavaSerializer.class)
+  private final SerializablePipelineOptions serializedOptions;
+  @Bind(JavaSerializer.class)
+  private final StateInternalsFactory<K> stateInternalsFactory;
+  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
+  private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+  private transient ProcessContext context;
+  private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
+  private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
+  private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() {
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) {
+      try {
+        if (t instanceof ApexStreamTuple.WatermarkTuple) {
+          ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>) t;
+          processWatermark(mark);
+          if (traceTuples) {
+            LOG.debug("\nemitting watermark {}\n", mark.getTimestamp());
+          }
+          output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(
+              mark.getTimestamp()));
+          return;
+        }
+        if (traceTuples) {
+          LOG.debug("\ninput {}\n", t.getValue());
+        }
+        processElement(t.getValue());
+      } catch (Exception e) {
+        Throwables.propagateIfPossible(e);
+        throw new RuntimeException(e);
+      }
+    }
+  };
+
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>>
+      output = new DefaultOutputPort<>();
+
+  @SuppressWarnings("unchecked")
+  public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input,
+      StateInternalsFactory<K> stateInternalsFactory) {
+    checkNotNull(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
+    this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy();
+    this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
+    this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
+    this.stateInternalsFactory = stateInternalsFactory;
+  }
+
+  @SuppressWarnings("unused") // for Kryo
+  private ApexGroupByKeyOperator() {
+    this.serializedOptions = null;
+    this.stateInternalsFactory = null;
+  }
+
+  @Override
+  public void beginWindow(long l) {
+  }
+
+  @Override
+  public void endWindow() {
+  }
+
+  @Override
+  public void setup(OperatorContext context) {
+    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
+    StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
+    this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy,
+        stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
+    this.context = new ProcessContext(fn, this.timerInternals);
+  }
+
+  @Override
+  public void teardown() {
+  }
+
+  /**
+   * Returns the list of timers that are ready to fire. These are the timers
+   * that are registered to be triggered at a time before the current watermark.
+   * We keep these timers in a Set, so that they are deduplicated, as the same
+   * timer can be registered multiple times.
+   */
+  private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(
+      long currentWatermark) {
+
+    // we keep the timers to return in a different list and launch them later
+    // because we cannot prevent a trigger from registering another trigger,
+    // which would lead to concurrent modification exception.
+    Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create();
+
+    Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it =
+        activeTimers.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+
+      Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
+      while (timerIt.hasNext()) {
+        TimerInternals.TimerData timerData = timerIt.next();
+        if (timerData.getTimestamp().isBefore(currentWatermark)) {
+          toFire.put(keyWithTimers.getKey(), timerData);
+          timerIt.remove();
+        }
+      }
+
+      if (keyWithTimers.getValue().isEmpty()) {
+        it.remove();
+      }
+    }
+    return toFire;
+  }
+
+  private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
+    final KV<K, V> kv = windowedValue.getValue();
+    final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(),
+        windowedValue.getTimestamp(),
+        windowedValue.getWindows(),
+        windowedValue.getPane());
+
+    KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
+            kv.getKey(),
+            Collections.singletonList(updatedWindowedValue));
+
+    context.setElement(kwi, getStateInternalsForKey(kwi.key()));
+    fn.processElement(context);
+  }
+
+  private StateInternals<K> getStateInternalsForKey(K key) {
+    final ByteBuffer keyBytes;
+    try {
+      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+    } catch (CoderException e) {
+      throw new RuntimeException(e);
+    }
+    StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes);
+    if (stateInternals == null) {
+      stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+      perKeyStateInternals.put(keyBytes, stateInternals);
+    }
+    return stateInternals;
+  }
+
+  private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
+    final ByteBuffer keyBytes;
+    try {
+      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+    } catch (CoderException e) {
+      throw new RuntimeException(e);
+    }
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
+    if (timersForKey == null) {
+      timersForKey = new HashSet<>();
+    }
+    timersForKey.add(timer);
+    activeTimers.put(keyBytes, timersForKey);
+  }
+
+  private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
+    final ByteBuffer keyBytes;
+    try {
+      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+    } catch (CoderException e) {
+      throw new RuntimeException(e);
+    }
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
+    if (timersForKey != null) {
+      timersForKey.remove(timer);
+      if (timersForKey.isEmpty()) {
+        activeTimers.remove(keyBytes);
+      } else {
+        activeTimers.put(keyBytes, timersForKey);
+      }
+    }
+  }
+
+  private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
+    this.inputWatermark = new Instant(mark.getTimestamp());
+    Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(
+        mark.getTimestamp());
+    if (!timers.isEmpty()) {
+      for (ByteBuffer keyBytes : timers.keySet()) {
+        K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+        KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes));
+        context.setElement(kwi, getStateInternalsForKey(kwi.key()));
+        fn.processElement(context);
+      }
+    }
+  }
+
+  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?,
+      KeyedWorkItem<K, V>>.ProcessContext {
+
+    private final ApexTimerInternals timerInternals;
+    private StateInternals<K> stateInternals;
+    private KeyedWorkItem<K, V> element;
+
+    public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function,
+                          ApexTimerInternals timerInternals) {
+      function.super();
+      this.timerInternals = checkNotNull(timerInternals);
+    }
+
+    public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) {
+      this.element = element;
+      this.stateInternals = stateForKey;
+    }
+
+    @Override
+    public KeyedWorkItem<K, V> element() {
+      return this.element;
+    }
+
+    @Override
+    public Instant timestamp() {
+      throw new UnsupportedOperationException(
+          "timestamp() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return serializedOptions.get();
+    }
+
+    @Override
+    public void output(KV<K, Iterable<V>> output) {
+      throw new UnsupportedOperationException(
+          "output() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) {
+      throw new UnsupportedOperationException(
+          "outputWithTimestamp() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public PaneInfo pane() {
+      throw new UnsupportedOperationException(
+          "pane() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public BoundedWindow window() {
+      throw new UnsupportedOperationException(
+          "window() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() {
+      return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() {
+
+        @Override
+        public StateInternals<K> stateInternals() {
+          return stateInternals;
+        }
+
+        @Override
+        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp,
+            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+          if (traceTuples) {
+            LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
+          }
+          ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(
+              WindowedValue.of(output, timestamp, windows, pane)));
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          return timerInternals;
+        }
+
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
+        }
+
+        @Override
+        public PaneInfo pane() {
+          throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
+        }
+
+        @Override
+        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
+            Coder<T> elemCoder) throws IOException {
+          throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+          throw new RuntimeException("sideInput() is not available in Streaming mode.");
+        }
+      };
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      // ignore the side output, this can happen when a user does not register
+      // side outputs but then outputs using a freshly created TupleTag.
+      throw new RuntimeException("sideOutput() is not available when grouping by window.");
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      sideOutput(tag, output);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+        String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * An implementation of Beam's {@link TimerInternals}.
+   *
+   */
+  public class ApexTimerInternals implements TimerInternals {
+
+    @Override
+    public void setTimer(TimerData timerKey) {
+      registerActiveTimer(context.element().key(), timerKey);
+    }
+
+    @Override
+    public void deleteTimer(TimerData timerKey) {
+      unregisterActiveTimer(context.element().key(), timerKey);
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return Instant.now();
+    }
+
+    @Override
+    public Instant currentSynchronizedProcessingTime() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return inputWatermark;
+    }
+
+    @Override
+    public Instant currentOutputWatermarkTime() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public void setTimer(StateNamespace namespace, String timerId, Instant target,
+        TimeDomain timeDomain) {
+      throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
+    }
+
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId) {
+      throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
+    }
+
+  }
+
+  private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public StateInternals<K> stateInternalsForKey(K key) {
+      return getStateInternalsForKey(key);
+    }
+  }
+}