You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/17 16:50:54 UTC

[02/11] incubator-beam git commit: BEAM-261 Apex runner PoC

BEAM-261 Apex runner PoC


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aaf38ddf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aaf38ddf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aaf38ddf

Branch: refs/heads/apex-runner
Commit: aaf38ddfe53bbb67fad4456ee1068d18b9b891b5
Parents: 49f9444
Author: Thomas Weise <th...@apache.org>
Authored: Mon Jun 27 11:24:13 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Oct 16 23:21:55 2016 -0700

----------------------------------------------------------------------
 runners/apex/pom.xml                            | 226 ++++++++++
 .../beam/runners/apex/ApexPipelineOptions.java  |  60 +++
 .../runners/apex/ApexPipelineTranslator.java    | 134 ++++++
 .../apache/beam/runners/apex/ApexRunner.java    | 171 ++++++++
 .../beam/runners/apex/ApexRunnerResult.java     |  85 ++++
 .../beam/runners/apex/TestApexRunner.java       |  56 +++
 .../translators/CreateValuesTranslator.java     |  49 +++
 .../FlattenPCollectionTranslator.java           |  52 +++
 .../apex/translators/GroupByKeyTranslator.java  |  41 ++
 .../apex/translators/ParDoBoundTranslator.java  |  43 ++
 .../translators/ReadUnboundedTranslator.java    |  42 ++
 .../apex/translators/TransformTranslator.java   |  31 ++
 .../apex/translators/TranslationContext.java    | 143 +++++++
 .../functions/ApexGroupByKeyOperator.java       | 427 +++++++++++++++++++
 .../functions/ApexParDoOperator.java            | 177 ++++++++
 .../io/ApexReadUnboundedInputOperator.java      | 125 ++++++
 .../apex/translators/io/ValuesSource.java       | 152 +++++++
 .../apex/translators/utils/ApexStreamTuple.java | 191 +++++++++
 .../utils/CoderAdapterStreamCodec.java          |  73 ++++
 .../translators/utils/NoOpSideInputReader.java  |  47 ++
 .../apex/translators/utils/NoOpStepContext.java |  73 ++++
 .../utils/SerializablePipelineOptions.java      |  61 +++
 .../apex/examples/StreamingWordCountTest.java   | 120 ++++++
 .../apex/examples/UnboundedTextSource.java      | 144 +++++++
 .../FlattenPCollectionTranslatorTest.java       |  97 +++++
 .../translators/GroupByKeyTranslatorTest.java   | 248 +++++++++++
 .../translators/ParDoBoundTranslatorTest.java   | 164 +++++++
 .../translators/ReadUnboundTranslatorTest.java  | 130 ++++++
 .../translators/utils/CollectionSource.java     | 137 ++++++
 .../translators/utils/PipelineOptionsTest.java  |  82 ++++
 .../apex/src/test/resources/log4j.properties    |  33 ++
 runners/pom.xml                                 |   1 +
 32 files changed, 3615 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
new file mode 100644
index 0000000..bb08b3c
--- /dev/null
+++ b/runners/apex/pom.xml
@@ -0,0 +1,226 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>0.3.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-apex_3.4.0</artifactId>
+
+  <name>Apache Beam :: Runners :: Apex</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <apex.core.version>3.4.0</apex.core.version>
+    <apex.malhar.version>3.4.0</apex.malhar.version>
+    <skipIntegrationTests>true</skipIntegrationTests>
+    <!-- memory limit for embedded cluster -->
+    <surefire.args>-Xmx2048m</surefire.args>
+  </properties>
+
+  <dependencies>
+    <!-- Apex dependencies -->
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.core.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${apex.malhar.version}</version>
+    </dependency>
+    <dependency>
+       <groupId>com.fasterxml.jackson.core</groupId>
+       <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.core.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!--- Beam -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-java</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+		<!-- javax.annotation.Nullable -->
+       <groupId>com.google.code.findbugs</groupId>
+       <artifactId>annotations</artifactId>
+    </dependency>
+
+    <!-- Test scoped -->
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- Depend on test jar to scan for RunnableOnService tests -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!--dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-examples-java</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+      <scope>test</scope>
+    </dependency-->
+    <!-- Optional Pipeline Registration -->
+    <!--dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency-->
+  </dependencies>
+
+  <build>
+    <plugins>
+
+      <!-- Checkstyle errors for now
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      -->
+
+      <!-- Integration Tests -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+      </plugin>
+
+      <!-- Unit Tests -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <argLine>${surefire.args}</argLine>
+        </configuration>
+        <executions>
+          <execution>
+            <id>runnable-on-service-tests</id>
+            <phase>integration-test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <configuration>
+              <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+              <parallel>none</parallel>
+              <failIfNoTests>true</failIfNoTests>
+              <dependenciesToScan>
+                <dependency>org.apache.beam:beam-sdks-java-core</dependency>
+              </dependenciesToScan>
+              <systemPropertyVariables>
+                <beamTestPipelineOptions>
+                  [
+                    "--runner=org.apache.beam.runners.apex.TestApexRunner",
+                    "--streaming=true"
+                  ]
+                </beamTestPipelineOptions>
+              </systemPropertyVariables>
+              <skipTests>${skipIntegrationTests}</skipTests>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>analyze-only</goal>
+            </goals>
+            <configuration>
+              <ignoredUsedUndeclaredDependencies>
+                <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.4.0</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::2.24.0</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.2.1</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.2.0</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:19.0</ignoredUsedUndeclaredDependency>
+              </ignoredUsedUndeclaredDependencies>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
new file mode 100644
index 0000000..f70d24c
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options that configure the Apex pipeline.
+ */
+public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializable {
+
+  @Description("set unique application name for Apex runner")
+  void setApplicationName(String name);
+
+  String getApplicationName();
+
+  @Description("set parallelism for Apex runner")
+  void setParallelism(int parallelism);
+
+  @Default.Integer(1)
+  int getParallelism();
+
+  @Description("execute the pipeline with embedded cluster")
+  void setEmbeddedExecution(boolean embedded);
+
+  @Default.Boolean(true)
+  boolean isEmbeddedExecution();
+
+  @Description("configure embedded execution with debug friendly options")
+  void setEmbeddedExecutionDebugMode(boolean embeddedDebug);
+
+  @Default.Boolean(true)
+  boolean isEmbeddedExecutionDebugMode();
+
+  @Description("how long the client should wait for the pipeline to run")
+  void setRunMillis(long runMillis);
+
+  @Default.Long(0)
+  long getRunMillis();
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
new file mode 100644
index 0000000..8ea7139
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex;
+
+import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
+import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
+import org.apache.beam.runners.apex.translators.GroupByKeyTranslator;
+import org.apache.beam.runners.apex.translators.ParDoBoundTranslator;
+import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator;
+import org.apache.beam.runners.apex.translators.TransformTranslator;
+import org.apache.beam.runners.apex.translators.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link ApexPipelineTranslator} translates {@link Pipeline} objects
+ * into Apex logical plan {@link DAG}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ApexPipelineTranslator.class);
+
+  /**
+   * A map from {@link PTransform} subclass to the corresponding
+   * {@link TransformTranslator} to use to translate that transform.
+   */
+  private static final Map<Class<? extends PTransform>, TransformTranslator>
+      transformTranslators = new HashMap<>();
+
+  private final TranslationContext translationContext;
+
+  static {
+    // register TransformTranslators
+    registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+    registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
+    registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
+    registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+        new FlattenPCollectionTranslator());
+    registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+  }
+
+  public ApexPipelineTranslator(TranslationContext translationContext) {
+    this.translationContext = translationContext;
+  }
+
+  public void translate(Pipeline pipeline) {
+    pipeline.traverseTopologically(this);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+    LOG.debug("entering composite transform {}", node.getTransform());
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformTreeNode node) {
+    LOG.debug("leaving composite transform {}", node.getTransform());
+  }
+
+  @Override
+  public void visitPrimitiveTransform(TransformTreeNode node) {
+    LOG.debug("visiting transform {}", node.getTransform());
+    PTransform transform = node.getTransform();
+    TransformTranslator translator = getTransformTranslator(transform.getClass());
+    if (null == translator) {
+      throw new IllegalStateException(
+          "no translator registered for " + transform);
+    }
+    translationContext.setCurrentTransform(node);
+    translator.translate(transform, translationContext);
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformTreeNode producer) {
+    LOG.debug("visiting value {}", value);
+  }
+
+  /**
+   * Records that instances of the specified PTransform class
+   * should be translated by default by the corresponding
+   * {@link TransformTranslator}.
+   */
+  private static <TransformT extends PTransform> void registerTransformTranslator(
+      Class<TransformT> transformClass,
+      TransformTranslator<? extends TransformT> transformTranslator) {
+    if (transformTranslators.put(transformClass, transformTranslator) != null) {
+      throw new IllegalArgumentException(
+          "defining multiple translators for " + transformClass);
+    }
+  }
+
+  /**
+   * Returns the {@link TransformTranslator} to use for instances of the
+   * specified PTransform class, or null if none registered.
+   */
+  private <TransformT extends PTransform<?,?>>
+  TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
+    return transformTranslators.get(transformClass);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
new file mode 100644
index 0000000..87c8f97
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.apex.translators.TranslationContext;
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AssignWindows;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.google.common.base.Throwables;
+
+/**
+ * A {@link PipelineRunner} that translates the
+ * pipeline to an Apex DAG and executes it on an Apex cluster.
+ * <p>
+ * Currently execution is always in embedded mode,
+ * launch on Hadoop cluster will be added in subsequent iteration.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
+
+  private final ApexPipelineOptions options;
+
+  public ApexRunner(ApexPipelineOptions options) {
+    this.options = options;
+  }
+
+  public static ApexRunner fromOptions(PipelineOptions options) {
+    return new ApexRunner((ApexPipelineOptions) options);
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    if (Window.Bound.class.equals(transform.getClass())) {
+      return (OutputT) ((PCollection) input).apply(
+          new AssignWindowsAndSetStrategy((Window.Bound) transform));
+    } else if (Create.Values.class.equals(transform.getClass())) {
+      return (OutputT) PCollection
+          .<OutputT>createPrimitiveOutputInternal(
+              input.getPipeline(),
+              WindowingStrategy.globalDefault(),
+              PCollection.IsBounded.BOUNDED);
+    } else if (Read.Bounded.class.equals(transform.getClass())) {
+      return (OutputT) ((PBegin) input).apply(new UnboundedReadFromBoundedSource<>(((Read.Bounded)transform).getSource()));
+    } else {
+      return super.apply(transform, input);
+    }
+  }
+
+  @Override
+  public ApexRunnerResult run(Pipeline pipeline) {
+
+    final TranslationContext translationContext = new TranslationContext(options);
+    ApexPipelineTranslator translator = new ApexPipelineTranslator(translationContext);
+    translator.translate(pipeline);
+
+    StreamingApplication apexApp = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName());
+        translationContext.populateDAG(dag);
+      }
+    };
+
+    checkArgument(options.isEmbeddedExecution(), "only embedded execution is supported at this time");
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    try {
+      lma.prepareDAG(apexApp, conf);
+      LocalMode.Controller lc = lma.getController();
+      if (options.isEmbeddedExecutionDebugMode()) {
+        // turns off timeout checking for operator progress
+        lc.setHeartbeatMonitoringEnabled(false);
+      }
+      if (options.getRunMillis() > 0) {
+        lc.run(options.getRunMillis());
+      } else {
+        lc.runAsync();
+      }
+      return new ApexRunnerResult(lma.getDAG(), lc);
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * copied from DirectPipelineRunner.
+   * used to replace Window.Bound till equivalent function is added in Apex
+   */
+  private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+
+    private final Window.Bound<T> wrapped;
+
+    public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
+      this.wrapped = wrapped;
+    }
+
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      WindowingStrategy<?, ?> outputStrategy =
+          wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
+
+      WindowFn<T, BoundedWindow> windowFn =
+          (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+
+      // If the Window.Bound transform only changed parts other than the WindowFn, then
+      // we skip AssignWindows even though it should be harmless in a perfect world.
+      // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
+      // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
+      // AssignWindows in this case.
+      if (wrapped.getWindowFn() == null) {
+        return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
+            .setWindowingStrategyInternal(outputStrategy);
+      } else {
+        return input
+            .apply("AssignWindows", new AssignWindows<>(windowFn))
+            .setWindowingStrategyInternal(outputStrategy);
+      }
+    }
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
new file mode 100644
index 0000000..f28c8dc
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.joda.time.Duration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Result of executing a {@link Pipeline} with Apex in embedded mode.
+ */
+public class ApexRunnerResult implements PipelineResult {
+  private final DAG apexDAG;
+  private final LocalMode.Controller ctrl;
+  private State state = State.UNKNOWN;
+
+  public ApexRunnerResult(DAG dag, LocalMode.Controller ctrl) {
+    this.apexDAG = dag;
+    this.ctrl = ctrl;
+  }
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
+      throws AggregatorRetrievalException {
+    return null;
+  }
+
+  @Override
+  public State cancel() throws IOException
+  {
+    ctrl.shutdown();
+    state = State.CANCELLED;
+    return state;
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) throws IOException, InterruptedException
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public State waitUntilFinish() throws IOException, InterruptedException
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Return the DAG executed by the pipeline.
+   * @return
+   */
+  public DAG getApexDAG() {
+    return apexDAG;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
new file mode 100644
index 0000000..45c143e
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+
+public class TestApexRunner extends PipelineRunner<ApexRunnerResult> {
+
+  private ApexRunner delegate;
+
+  private TestApexRunner(ApexPipelineOptions options) {
+    options.setEmbeddedExecution(true);
+    //options.setEmbeddedExecutionDebugMode(false);
+    options.setRunMillis(20000);
+    this.delegate = ApexRunner.fromOptions(options);
+  }
+
+  public static TestApexRunner fromOptions(PipelineOptions options) {
+    ApexPipelineOptions apexOptions = PipelineOptionsValidator.validate(ApexPipelineOptions.class, options);
+    return new TestApexRunner(apexOptions);
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput>
+      OutputT apply(PTransform<InputT,OutputT> transform, InputT input) {
+    return delegate.apply(transform, input);
+  }
+
+  @Override
+  public ApexRunnerResult run(Pipeline pipeline) {
+    return delegate.run(pipeline);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
new file mode 100644
index 0000000..387b19f
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translators.io.ValuesSource;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.Create;
+
+import com.google.common.base.Throwables;
+
+
+/**
+ * Wraps elements from Create.Values into an {@link UnboundedSource}.
+ * mainly used for test
+ */
+public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
+  private static final long serialVersionUID = 1451000241832745629L;
+
+  @Override
+  public void translate(Create.Values<T> transform, TranslationContext context) {
+    try {
+      UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
+          transform.getDefaultOutputCoder(context.getInput()));
+      ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(unboundedSource,
+          context.getPipelineOptions());
+      context.addOperator(operator, operator.output);
+    } catch (CannotProvideCoderException e) {
+      Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
new file mode 100644
index 0000000..f228149
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+import com.datatorrent.lib.stream.StreamMerger;
+
+/**
+ * Flatten.FlattenPCollectionList translation to Apex operator.
+ * TODO: support more than two streams
+ */
+public class FlattenPCollectionTranslator<T> implements
+    TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+    StreamMerger<T> operator = null;
+    PCollectionList<T> collections = context.getInput();
+    if (collections.size() > 2) {
+      throw new UnsupportedOperationException("Currently supports only 2 collections: " + transform);
+    }
+    for (PCollection<T> collection : collections.getAll()) {
+      if (null == operator) {
+        operator = new StreamMerger<T>();
+        context.addStream(collection, operator.data1);
+      } else {
+        context.addStream(collection, operator.data2);
+      }
+    }
+    context.addOperator(operator, operator.out);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
new file mode 100644
index 0000000..43c82a9
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.translators.functions.ApexGroupByKeyOperator;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link GroupByKey} translation to Apex operator.
+ */
+public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(GroupByKey<K, V> transform, TranslationContext context) {
+
+    PCollection<KV<K, V>> input = context.getInput();
+    ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(), input);
+    context.addOperator(group, group.output);
+    context.addStream(input, group.input);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
new file mode 100644
index 0000000..a958234
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
+import org.apache.beam.runners.apex.translators.utils.NoOpSideInputReader;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link ParDo.Bound} is translated to Apex operator that wraps the {@link DoFn}
+ */
+public class ParDoBoundTranslator<InputT, OutputT> implements
+    TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
+    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    PCollection<OutputT> output = context.getOutput();
+    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
+        doFn, output.getWindowingStrategy(), new NoOpSideInputReader());
+    context.addOperator(operator, operator.output);
+    context.addStream(context.getInput(), operator.input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
new file mode 100644
index 0000000..b53e4dd
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+import com.datatorrent.api.InputOperator;
+
+/**
+ * {@link Read.Unbounded} is translated to Apex {@link InputOperator}
+ * that wraps {@link UnboundedSource}.
+ */
+public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(Read.Unbounded<T> transform, TranslationContext context) {
+    UnboundedSource<T, ?> unboundedSource = transform.getSource();
+    ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+        unboundedSource, context.getPipelineOptions());
+    context.addOperator(operator, operator.output);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
new file mode 100644
index 0000000..1a99885
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+import java.io.Serializable;
+
+/**
+ * translates {@link PTransform} to Apex functions.
+ */
+public interface TransformTranslator<T extends PTransform<?,?>> extends Serializable {
+  void translate(T transform, TranslationContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
new file mode 100644
index 0000000..92afd58
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translators.utils.CoderAdapterStreamCodec;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Operator.OutputPort;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Maintains context data for {@link TransformTranslator}s.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class TranslationContext {
+
+  private final ApexPipelineOptions pipelineOptions;
+  private AppliedPTransform<?, ?, ?> currentTransform;
+  private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>();
+  private final Map<String, Operator> operators = new HashMap<>();
+
+  public TranslationContext(ApexPipelineOptions pipelineOptions) {
+    this.pipelineOptions = pipelineOptions;
+  }
+
+  public void setCurrentTransform(TransformTreeNode treeNode) {
+    this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
+        treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
+  }
+
+  public ApexPipelineOptions getPipelineOptions() {
+    return pipelineOptions;
+  }
+
+  public <InputT extends PInput> InputT getInput() {
+    return (InputT) getCurrentTransform().getInput();
+  }
+
+  public <OutputT extends POutput> OutputT getOutput() {
+    return (OutputT) getCurrentTransform().getOutput();
+  }
+
+  private AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    checkArgument(currentTransform != null, "current transform not set");
+    return currentTransform;
+  }
+
+  public void addOperator(Operator operator, OutputPort port) {
+    // Apex DAG requires a unique operator name
+    // use the transform's name and make it unique
+    String name = getCurrentTransform().getFullName();
+    for (int i=1; this.operators.containsKey(name); name = getCurrentTransform().getFullName() + i++);
+    this.operators.put(name, operator);
+    PCollection<?> output = getOutput();
+    this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>()));
+  }
+
+  /**
+   * Add operator that is internal to a transformation.
+   * @param output
+   * @param operator
+   * @param port
+   * @param name
+   */
+  public <T> PInput addInternalOperator(Operator operator, OutputPort port, String name, Coder<T> coder) {
+    checkArgument(this.operators.get(name) == null, "duplicate operator " + name);
+    this.operators.put(name, operator);
+    PCollection<T> input = getInput();
+    PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+    output.setCoder(coder);
+    this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>()));
+    return output;
+  }
+
+  public void addStream(PInput input, InputPort inputPort) {
+    Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input);
+    checkArgument(stream != null, "no upstream operator defined");
+    stream.getRight().add(inputPort);
+  }
+
+  public void populateDAG(DAG dag) {
+    for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) {
+      dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue());
+    }
+    int streamIndex = 0;
+    for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.streams.entrySet()) {
+      List<InputPort<?>> sinksList = streamEntry.getValue().getRight();
+      InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]);
+      if (sinks.length > 0) {
+        dag.addStream("stream"+streamIndex++, streamEntry.getValue().getLeft(), sinks);
+        for (InputPort port : sinks) {
+          PCollection pc = streamEntry.getKey();
+          Coder coder = pc.getCoder();
+          if (pc.getWindowingStrategy() != null) {
+            coder = FullWindowedValueCoder.of(pc.getCoder(),
+                pc.getWindowingStrategy().getWindowFn().windowCoder()
+                );
+          }
+          Coder<Object> wrapperCoder = ApexStreamTuple.ApexStreamTupleCoder.of(coder);
+          CoderAdapterStreamCodec streamCodec = new CoderAdapterStreamCodec(wrapperCoder);
+          dag.setInputPortAttribute(port, PortContext.STREAM_CODEC, streamCodec);
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
new file mode 100644
index 0000000..4608c92
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.functions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.SystemReduceFn;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Apex operator for Beam {@link GroupByKey}.
+ * This operator expects the input stream already partitioned by K,
+ * which is determined by the {@link StreamCodec} on the input port.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ApexGroupByKeyOperator<K, V> implements Operator
+{
+  @Bind(JavaSerializer.class)
+  private WindowingStrategy<V, BoundedWindow> windowingStrategy;
+  @Bind(JavaSerializer.class)
+  private Coder<V> valueCoder;
+
+  @Bind(JavaSerializer.class)
+  private final SerializablePipelineOptions serializedOptions;
+  @Bind(JavaSerializer.class)
+  private Map<K, StateInternals<K>> perKeyStateInternals = new HashMap<>();
+  private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+  private transient ProcessContext context;
+  private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
+  private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
+  private Instant inputWatermark = new Instant(0);
+
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>()
+  {
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t)
+    {
+      //System.out.println("\n***RECEIVED: " +t);
+      try {
+        if (t instanceof ApexStreamTuple.WatermarkTuple) {
+          ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>)t;
+          processWatermark(mark);
+          output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(mark.getTimestamp()));
+          return;
+        }
+        processElement(t.getValue());
+      } catch (Exception e) {
+        Throwables.propagate(e);
+      }
+    }
+  };
+
+  @OutputPortFieldAnnotation(optional=true)
+  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>> output = new DefaultOutputPort<>();
+
+  @SuppressWarnings("unchecked")
+  public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input)
+  {
+    Preconditions.checkNotNull(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
+    this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>)input.getWindowingStrategy();
+    this.valueCoder = ((KvCoder<K, V>)input.getCoder()).getValueCoder();
+  }
+
+  @SuppressWarnings("unused") // for Kryo
+  private ApexGroupByKeyOperator()
+  {
+    this.serializedOptions = null;
+  }
+
+  @Override
+  public void beginWindow(long l)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
+    this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, stateInternalsFactory,
+        SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
+    this.context = new ProcessContext(fn, this.timerInternals);
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  /**
+   * Returns the list of timers that are ready to fire. These are the timers
+   * that are registered to be triggered at a time before the current watermark.
+   * We keep these timers in a Set, so that they are deduplicated, as the same
+   * timer can be registered multiple times.
+   */
+  private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
+
+    // we keep the timers to return in a different list and launch them later
+    // because we cannot prevent a trigger from registering another trigger,
+    // which would lead to concurrent modification exception.
+    Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
+
+    Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+
+      Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
+      while (timerIt.hasNext()) {
+        TimerInternals.TimerData timerData = timerIt.next();
+        if (timerData.getTimestamp().isBefore(currentWatermark)) {
+          toFire.put(keyWithTimers.getKey(), timerData);
+          timerIt.remove();
+        }
+      }
+
+      if (keyWithTimers.getValue().isEmpty()) {
+        it.remove();
+      }
+    }
+    return toFire;
+  }
+
+  private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
+    final KV<K, V> kv = windowedValue.getValue();
+    final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(),
+        windowedValue.getTimestamp(),
+        windowedValue.getWindows(),
+        windowedValue.getPane());
+
+    KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
+            kv.getKey(),
+            Collections.singletonList(updatedWindowedValue));
+
+    context.setElement(kwi, getStateInternalsForKey(kwi.key()));
+    fn.processElement(context);
+  }
+
+  private StateInternals<K> getStateInternalsForKey(K key) {
+    StateInternals<K> stateInternals = perKeyStateInternals.get(key);
+    if (stateInternals == null) {
+      //Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
+      //OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn();
+      stateInternals = InMemoryStateInternals.forKey(key);
+      perKeyStateInternals.put(key, stateInternals);
+    }
+    return stateInternals;
+  }
+
+  private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+    if (timersForKey == null) {
+      timersForKey = new HashSet<>();
+    }
+    timersForKey.add(timer);
+    activeTimers.put(key, timersForKey);
+  }
+
+  private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+    if (timersForKey != null) {
+      timersForKey.remove(timer);
+      if (timersForKey.isEmpty()) {
+        activeTimers.remove(key);
+      } else {
+        activeTimers.put(key, timersForKey);
+      }
+    }
+  }
+
+  private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
+    this.inputWatermark = new Instant(mark.getTimestamp());
+    Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
+    if (!timers.isEmpty()) {
+      for (K key : timers.keySet()) {
+        KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(key));
+        context.setElement(kwi, getStateInternalsForKey(kwi.key()));
+        fn.processElement(context);
+      }
+    }
+  }
+
+  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?, KeyedWorkItem<K, V>>.ProcessContext {
+
+    private final ApexTimerInternals timerInternals;
+    private StateInternals<K> stateInternals;
+    private KeyedWorkItem<K, V> element;
+
+    public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function,
+                          ApexTimerInternals timerInternals) {
+      function.super();
+      this.timerInternals = Preconditions.checkNotNull(timerInternals);
+    }
+
+    public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) {
+      this.element = element;
+      this.stateInternals = stateForKey;
+    }
+
+    @Override
+    public KeyedWorkItem<K, V> element() {
+      return this.element;
+    }
+
+    @Override
+    public Instant timestamp() {
+      throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return serializedOptions.get();
+    }
+
+    @Override
+    public void output(KV<K, Iterable<V>> output) {
+      throw new UnsupportedOperationException(
+          "output() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) {
+      throw new UnsupportedOperationException(
+          "outputWithTimestamp() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public PaneInfo pane() {
+      throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public BoundedWindow window() {
+      throw new UnsupportedOperationException(
+          "window() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() {
+      return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() {
+
+        @Override
+        public StateInternals<K> stateInternals() {
+          return stateInternals;
+        }
+
+        @Override
+        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+          System.out.println("\n***EMITTING: " + output + ", timestamp=" + timestamp);
+          ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          return timerInternals;
+        }
+
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
+        }
+
+        @Override
+        public PaneInfo pane() {
+          throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
+        }
+
+        @Override
+        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+          throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+          throw new RuntimeException("sideInput() is not available in Streaming mode.");
+        }
+      };
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      // ignore the side output, this can happen when a user does not register
+      // side outputs but then outputs using a freshly created TupleTag.
+      throw new RuntimeException("sideOutput() is not available when grouping by window.");
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      sideOutput(tag, output);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * An implementation of Beam's {@link TimerInternals}.
+   *
+   */
+  public class ApexTimerInternals implements TimerInternals {
+
+    @Override
+    public void setTimer(TimerData timerKey)
+    {
+      registerActiveTimer(context.element().key(), timerKey);
+    }
+
+    @Override
+    public void deleteTimer(TimerData timerKey)
+    {
+      unregisterActiveTimer(context.element().key(), timerKey);
+    }
+
+    @Override
+    public Instant currentProcessingTime()
+    {
+      return Instant.now();
+    }
+
+    @Override
+    public Instant currentSynchronizedProcessingTime()
+    {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime()
+    {
+      return inputWatermark;
+    }
+
+    @Override
+    public Instant currentOutputWatermarkTime()
+    {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+  }
+
+  private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable
+  {
+    @Override
+    public StateInternals<K> stateInternalsForKey(K key)
+    {
+      return getStateInternalsForKey(key);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
new file mode 100644
index 0000000..8005832
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators.functions;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translators.utils.NoOpStepContext;
+import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * Apex operator for Beam {@link DoFn}.
+ */
+public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager {
+
+  private transient final TupleTag<OutputT> mainTag = new TupleTag<OutputT>();
+  private transient DoFnRunner<InputT, OutputT> doFnRunner;
+
+  @Bind(JavaSerializer.class)
+  private final SerializablePipelineOptions pipelineOptions;
+  @Bind(JavaSerializer.class)
+  private final OldDoFn<InputT, OutputT> doFn;
+  @Bind(JavaSerializer.class)
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  @Bind(JavaSerializer.class)
+  private final SideInputReader sideInputReader;
+
+  public ApexParDoOperator(
+      ApexPipelineOptions pipelineOptions,
+      OldDoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      SideInputReader sideInputReader) {
+    this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
+    this.doFn = doFn;
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputReader = sideInputReader;
+  }
+
+  @SuppressWarnings("unused") // for Kryo
+  private ApexParDoOperator() {
+    this(null, null, null, null);
+  }
+
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
+  {
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<InputT>> t)
+    {
+      if (t instanceof ApexStreamTuple.WatermarkTuple) {
+        output.emit(t);
+      } else {
+        System.out.println("\n" + Thread.currentThread().getName() + "\n" + t.getValue() + "\n");
+        doFnRunner.processElement(t.getValue());
+      }
+    }
+  };
+
+  @OutputPortFieldAnnotation(optional=true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>();
+
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple)
+  {
+    output.emit(ApexStreamTuple.DataTuple.of(tuple));
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    this.doFnRunner = DoFnRunners.simpleRunner(pipelineOptions.get(),
+        doFn,
+        sideInputReader,
+        this,
+        mainTag,
+        TupleTagList.empty().getAll(),
+        new NoOpStepContext(),
+        new NoOpAggregatorFactory(),
+        windowingStrategy
+        );
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    doFnRunner.startBundle();
+    /*
+    Collection<Aggregator<?, ?>> aggregators = AggregatorRetriever.getAggregators(doFn);
+    if (!aggregators.isEmpty()) {
+      System.out.println("\n" + Thread.currentThread().getName() + "\n" +AggregatorRetriever.getAggregators(doFn) + "\n");
+    }
+    */
+  }
+
+  @Override
+  public void endWindow()
+  {
+    doFnRunner.finishBundle();
+  }
+
+  /**
+   * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode.
+   * It is called from {@link org.apache.beam.sdk.util.SimpleDoFnRunner}.
+   */
+  public class NoOpAggregatorFactory implements AggregatorFactory {
+
+    private NoOpAggregatorFactory() {
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+        Class<?> fnClass, ExecutionContext.StepContext step,
+        String name, CombineFn<InputT, AccumT, OutputT> combine) {
+      return new Aggregator<InputT, OutputT>() {
+
+        @Override
+        public void addValue(InputT value)
+        {
+        }
+
+        @Override
+        public String getName()
+        {
+          // TODO Auto-generated method stub
+          return null;
+        }
+
+        @Override
+        public CombineFn<InputT, ?, OutputT> getCombineFn()
+        {
+          // TODO Auto-generated method stub
+          return null;
+        }
+
+      };
+    }
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
new file mode 100644
index 0000000..39114fe
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators.io;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.DataTuple;
+import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.joda.time.Instant;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.google.common.base.Throwables;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import java.io.IOException;
+
+/**
+ * Apex input operator that wraps Beam UnboundedSource.
+ */
+public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+    implements InputOperator {
+
+  @Bind(JavaSerializer.class)
+  private final SerializablePipelineOptions pipelineOptions;
+  @Bind(JavaSerializer.class)
+  private final UnboundedSource<OutputT, CheckpointMarkT> source;
+  private transient UnboundedSource.UnboundedReader<OutputT> reader;
+  private transient boolean available = false;
+  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = new DefaultOutputPort<>();
+
+  public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, ApexPipelineOptions options) {
+    this.pipelineOptions = new SerializablePipelineOptions(options);
+    this.source = source;
+  }
+
+  @SuppressWarnings("unused") // for Kryo
+  private ApexReadUnboundedInputOperator() {
+    this.pipelineOptions = null; this.source = null;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    Instant mark = reader.getWatermark();
+    output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark.getMillis()));
+    if (!available && source instanceof ValuesSource) {
+      // if it's a Create transformation and the input was consumed,
+      // terminate the stream (allows tests to finish faster)
+      BaseOperator.shutdown();
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    try {
+      reader = source.createReader(this.pipelineOptions.get(), null);
+      available = reader.start();
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    try {
+      if (reader != null) {
+        reader.close();
+      }
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    try {
+      if (!available) {
+        available = reader.advance();
+      }
+      if (available) {
+        OutputT data = reader.getCurrent();
+        Instant timestamp = reader.getCurrentTimestamp();
+        available = reader.advance();
+        output.emit(DataTuple.of(WindowedValue.of(
+            data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
+      }
+    } catch (Exception e) {
+      Throwables.propagate(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
new file mode 100644
index 0000000..2c4b298
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators.io;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+import com.google.common.base.Throwables;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * unbounded source that reads from a Java {@link Iterable}.
+ */
+public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
+  private static final long serialVersionUID = 1L;
+
+  private final byte[] codedValues;
+  private final IterableCoder<T> iterableCoder;
+
+  public ValuesSource(Iterable<T> values, Coder<T> coder) {
+    this.iterableCoder = IterableCoder.of(coder);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try {
+      iterableCoder.encode(values, bos, Context.OUTER);
+    } catch (IOException ex) {
+      Throwables.propagate(ex);
+    }
+    this.codedValues = bos.toByteArray();
+  }
+
+  @Override
+  public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+      int desiredNumSplits, PipelineOptions options) throws Exception {
+    return Collections.singletonList(this);
+  }
+
+  @Override
+  public UnboundedReader<T> createReader(PipelineOptions options,
+      @Nullable CheckpointMark checkpointMark) {
+    ByteArrayInputStream bis = new ByteArrayInputStream(codedValues);
+    try {
+      Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER);
+      return new ValuesReader<>(values, this);
+    } catch (IOException ex) {
+      throw Throwables.propagate(ex);
+    }
+  }
+
+  @Nullable
+  @Override
+  public Coder<CheckpointMark> getCheckpointMarkCoder() {
+    return null;
+  }
+
+  @Override
+  public void validate() {
+  }
+
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    return iterableCoder.getElemCoder();
+  }
+
+  private static class ValuesReader<T> extends UnboundedReader<T> {
+
+    private final Iterable<T> values;
+    private final UnboundedSource<T, CheckpointMark> source;
+    private transient Iterator<T> iterator;
+    private T current;
+
+    public ValuesReader(Iterable<T> values, UnboundedSource<T, CheckpointMark> source) {
+      this.values = values;
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      if (null == iterator) {
+        iterator = values.iterator();
+      }
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (iterator.hasNext()) {
+        current = iterator.next();
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      return current;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return Instant.now();
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return Instant.now();
+    }
+
+    @Override
+    public CheckpointMark getCheckpointMark() {
+      return null;
+    }
+
+    @Override
+    public UnboundedSource<T, ?> getCurrentSource() {
+      return source;
+    }
+  }
+}