You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/20 16:11:29 UTC

[2/3] incubator-beam git commit: [BEAM-79] add Gearpump runner

[BEAM-79] add Gearpump runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9478f411
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9478f411
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9478f411

Branch: refs/heads/gearpump-runner
Commit: 9478f4117de3a2d0ea40614ed4cb801918610724
Parents: eb69e10
Author: manuzhang <ow...@gmail.com>
Authored: Tue Mar 15 16:15:16 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Jul 20 08:42:52 2016 +0800

----------------------------------------------------------------------
 runners/gearpump/README.md                      |  22 +
 runners/gearpump/pom.xml                        | 296 +++++++++++
 .../gearpump/GearpumpPipelineOptions.java       |  67 +++
 .../gearpump/GearpumpPipelineResult.java        |  42 ++
 .../gearpump/GearpumpPipelineRunner.java        | 193 +++++++
 .../GearpumpPipelineRunnerRegistrar.java        |  63 +++
 .../gearpump/GearpumpPipelineTranslator.java    | 139 +++++
 .../runners/gearpump/TestGearpumpRunner.java    |  64 +++
 .../gearpump/examples/StreamingWordCount.java   | 105 ++++
 .../gearpump/examples/UnboundedTextSource.java  | 138 +++++
 .../translators/CreateValuesTranslator.java     |  49 ++
 .../FlattenPCollectionTranslator.java           |  47 ++
 .../translators/GroupByKeyTranslator.java       | 103 ++++
 .../translators/ParDoBoundMultiTranslator.java  | 154 ++++++
 .../translators/ParDoBoundTranslator.java       |  54 ++
 .../translators/ReadBoundedTranslator.java      |  44 ++
 .../translators/ReadUnboundedTranslator.java    |  46 ++
 .../translators/TransformTranslator.java        |  31 ++
 .../translators/TranslationContext.java         |  95 ++++
 .../translators/functions/DoFnFunction.java     |  88 ++++
 .../translators/io/BoundedSourceWrapper.java    |  44 ++
 .../gearpump/translators/io/GearpumpSource.java | 100 ++++
 .../translators/io/UnboundedSourceWrapper.java  |  45 ++
 .../gearpump/translators/io/ValuesSource.java   | 164 ++++++
 .../translators/utils/GearpumpDoFnRunner.java   | 513 +++++++++++++++++++
 .../translators/utils/NoOpSideInputReader.java  |  48 ++
 .../translators/utils/NoOpStepContext.java      |  71 +++
 runners/pom.xml                                 |  11 +
 .../apache/beam/sdk/testing/TestPipeline.java   |   2 +
 29 files changed, 2838 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/README.md
----------------------------------------------------------------------
diff --git a/runners/gearpump/README.md b/runners/gearpump/README.md
new file mode 100644
index 0000000..ad043fa
--- /dev/null
+++ b/runners/gearpump/README.md
@@ -0,0 +1,22 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+## Gearpump Beam Runner
+
+The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine. 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
new file mode 100644
index 0000000..c725dae
--- /dev/null
+++ b/runners/gearpump/pom.xml
@@ -0,0 +1,296 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+         xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>0.2.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-gearpump</artifactId>
+
+  <name>Apache Beam :: Runners :: Gearpump</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <gearpump.version>0.8.1-SNAPSHOT</gearpump.version>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>apache.snapshots</id>
+      <name>Apache Development Snapshot Repository</name>
+      <url>https://repository.apache.org/content/repositories/snapshots/</url>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+    <repository>
+      <id>gearpump-shaded-repo</id>
+      <name>Vincent at Bintray</name>
+      <url>http://dl.bintray.com/fvunicorn/maven</url>
+    </repository>
+  </repositories>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.gearpump</groupId>
+      <artifactId>gearpump-streaming_2.11</artifactId>
+      <version>${gearpump.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gearpump</groupId>
+      <artifactId>gearpump-core_2.11</artifactId>
+      <version>${gearpump.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.code.findbugs</groupId>
+          <artifactId>jsr305</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gearpump</groupId>
+      <artifactId>gearpump-daemon_2.11</artifactId>
+      <version>${gearpump.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.gearpump</groupId>
+          <artifactId>gearpump-experimental-cgroup_2.11</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe</groupId>
+      <artifactId>config</artifactId>
+      <scope>provided</scope>
+      <version>1.3.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.11.8</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.collections</groupId>
+          <artifactId>google-collections</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-java</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>annotations</artifactId>
+      <version>3.0.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <version>1.0-rc2</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <!-- JAR Packaging -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifest>
+              <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+              <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
+            </manifest>
+          </archive>
+        </configuration>
+      </plugin>
+
+      <!-- Java compiler -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+
+      <!-- Integration Tests -->
+      <plugin>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>integration-test</goal>
+              <goal>verify</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <forkCount>1</forkCount>
+          <argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
+        </configuration>
+      </plugin>
+
+      <!-- Unit Tests -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.19.1</version>
+        <executions>
+          <execution>
+            <id>runnable-on-service-tests</id>
+            <configuration>
+              <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+              <parallel>none</parallel>
+              <failIfNoTests>true</failIfNoTests>
+              <dependenciesToScan>
+                <dependency>org.apache.beam:beam-sdks-java-core</dependency>
+                <dependency>org.apache.beam:beam-runners-java-core</dependency>
+              </dependenciesToScan>
+              <excludes>
+                <!-- side input is not supported in Gearpump -->
+                <exclude>
+                  org.apache.beam.sdk.io.BigQueryIOTest,
+                  org.apache.beam.sdk.io.CountingInputTest,
+                  org.apache.beam.sdk.io.CountingSourceTest,
+                  org.apache.beam.sdk.testing.PAssertTest,
+                  org.apache.beam.sdk.transforms.ApproximateUniqueTest,
+                  org.apache.beam.sdk.transforms.CombineTest,
+                  org.apache.beam.sdk.transforms.CombineFnsTest,
+                  org.apache.beam.sdk.transforms.CountTest,
+                  org.apache.beam.sdk.transforms.FlattenTest,
+                  org.apache.beam.sdk.transforms.ParDoTest,
+                  org.apache.beam.sdk.transforms.SampleTest,
+                  org.apache.beam.sdk.transforms.ViewTest,
+                  org.apache.beam.sdk.transforms.join.CoGroupByKeyTest
+                </exclude>
+                <!-- merging windows is not supported in Gearpump -->
+                <exclude>
+                  org.apache.beam.sdk.transforms.windowing.WindowingTest,
+                  org.apache.beam.sdk.util.ReshuffleTest
+                </exclude>
+              </excludes>
+              <systemPropertyVariables>
+                <beamTestPipelineOptions>
+                  [
+                  "--runner=TestGearpumpRunner",
+                  "--streaming=true"
+                  ]
+                </beamTestPipelineOptions>
+              </systemPropertyVariables>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- uber jar -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
new file mode 100644
index 0000000..5b6ee96
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+
+import java.util.Map;
+
+/**
+ * Options that configure the Gearpump pipeline.
+ */
+public interface GearpumpPipelineOptions extends PipelineOptions {
+
+  @Description("set unique application name for Gearpump runner")
+  void setApplicationName(String name);
+
+  String getApplicationName();
+
+  @Description("set parallelism for Gearpump processor")
+  void setParallelism(int parallelism);
+
+  @Default.Integer(1)
+  int getParallelism();
+
+  @Description("register Kryo serializers")
+  void setSerializers(Map<String, String> serializers);
+
+  @JsonIgnore
+  Map<String, String> getSerializers();
+
+  @Description("set EmbeddedCluster for tests")
+  void setEmbeddedCluster(EmbeddedCluster cluster);
+
+  @JsonIgnore
+  EmbeddedCluster getEmbeddedCluster();
+
+  void setClientContext(ClientContext clientContext);
+
+  @JsonIgnore
+  @Description("get client context to query application status")
+  ClientContext getClientContext();
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
new file mode 100644
index 0000000..bc27147
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.transforms.Aggregator;
+
+/**
+ * Result of executing a {@link Pipeline} with Gearpump.
+ */
+public class GearpumpPipelineResult implements PipelineResult {
+  @Override
+  public State getState() {
+    return null;
+  }
+
+  @Override
+  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
+      throws AggregatorRetrievalException {
+    throw new AggregatorRetrievalException(
+        "PipelineResult getAggregatorValues not supported in Gearpump pipeline",
+        new UnsupportedOperationException());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
new file mode 100644
index 0000000..660d703
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AssignWindows;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to Gearpump Stream DSL
+ * and then executing them on a Gearpump cluster.
+ * <p>>
+ * This is based on DataflowPipelineRunner.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResult> {
+
+  private final GearpumpPipelineOptions options;
+
+  private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
+  private static final String DEFAULT_APPNAME = "beam_gearpump_app";
+
+  public GearpumpPipelineRunner(GearpumpPipelineOptions options) {
+    this.options = options;
+  }
+
+  public static GearpumpPipelineRunner fromOptions(PipelineOptions options) {
+    GearpumpPipelineOptions pipelineOptions =
+        PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options);
+    return new GearpumpPipelineRunner(pipelineOptions);
+  }
+
+
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    if (Window.Bound.class.equals(transform.getClass())) {
+      return (OutputT) super.apply(
+          new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
+    } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
+        && ((PCollectionList<?>) input).size() == 0) {
+      return (OutputT) Pipeline.applyTransform(input, Create.of());
+    } else if (Create.Values.class.equals(transform.getClass())) {
+      return (OutputT) PCollection
+          .<OutputT>createPrimitiveOutputInternal(
+              input.getPipeline(),
+              WindowingStrategy.globalDefault(),
+              PCollection.IsBounded.BOUNDED);
+    } else {
+      return super.apply(transform, input);
+    }
+  }
+
+  @Override
+  public GearpumpPipelineResult run(Pipeline pipeline) {
+    String appName = options.getApplicationName();
+    if (null == appName) {
+      appName = DEFAULT_APPNAME;
+    }
+    Config config = registerSerializers(ClusterConfig.defaultConfig(),
+        options.getSerializers());
+    ClientContext clientContext = getClientContext(options, config);
+    options.setClientContext(clientContext);
+    JavaStreamApp streamApp = new JavaStreamApp(
+        appName, clientContext, UserConfig.empty());
+    TranslationContext translationContext = new TranslationContext(streamApp, options);
+    GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
+    translator.translate(pipeline);
+    streamApp.run();
+
+    return null;
+  }
+
+  private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
+    EmbeddedCluster cluster = options.getEmbeddedCluster();
+    if (cluster != null) {
+      return cluster.newClientContext();
+    } else {
+      return ClientContext.apply(config);
+    }
+  }
+
+  /**
+   * register class with default kryo serializers.
+   */
+  private Config registerSerializers(Config config, Map<String, String> userSerializers) {
+    Map<String, String> serializers = new HashMap<>();
+    serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", "");
+    serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", "");
+    serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", "");
+    serializers.put("org.joda.time.Instant", "");
+    serializers.put("org.apache.beam.sdk.values.KV", "");
+    serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", "");
+    serializers.put("org.apache.beam.sdk.values.TimestampedValue", "");
+    if (userSerializers != null && !userSerializers.isEmpty()) {
+      serializers.putAll(userSerializers);
+    }
+    return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
+  }
+
+
+  /**
+   * copied from DirectPipelineRunner.
+   * used to replace Window.Bound till window function is added to Gearpump Stream DSL
+   */
+  private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+
+    private final Window.Bound<T> wrapped;
+
+    public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
+      this.wrapped = wrapped;
+    }
+
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      WindowingStrategy<?, ?> outputStrategy =
+          wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
+
+      WindowFn<T, BoundedWindow> windowFn =
+          (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+
+      if (!windowFn.isNonMerging()) {
+        throw new UnsupportedOperationException(
+            "merging window is not supported in Gearpump pipeline");
+      }
+
+      // If the Window.Bound transform only changed parts other than the WindowFn, then
+      // we skip AssignWindows even though it should be harmless in a perfect world.
+      // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
+      // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
+      // AssignWindows in this case.
+      if (wrapped.getWindowFn() == null) {
+        return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
+            .setWindowingStrategyInternal(outputStrategy);
+      } else {
+        return input
+            .apply("AssignWindows", new AssignWindows<>(windowFn))
+            .setWindowingStrategyInternal(outputStrategy);
+      }
+    }
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
new file mode 100644
index 0000000..2b9e89e
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
+ * {@link GearpumpPipelineRunner}.
+ *
+ * {@link AutoService} will register Gearpump's implementations of the {@link PipelineRunner}
+ * and {@link PipelineOptions} as available pipeline runner services.
+ */
+public class GearpumpPipelineRunnerRegistrar {
+  private GearpumpPipelineRunnerRegistrar() { }
+
+  /**
+   * Registers the {@link GearpumpPipelineRunner}.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class Runner implements PipelineRunnerRegistrar {
+
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+          TestGearpumpRunner.class);
+    }
+  }
+
+  /**
+   * Registers the {@link GearpumpPipelineOptions}.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class Options implements PipelineOptionsRegistrar {
+
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends PipelineOptions>>of(GearpumpPipelineOptions.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
new file mode 100644
index 0000000..59f0df7
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator;
+import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator;
+import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
+import org.apache.beam.runners.gearpump.translators.ParDoBoundMultiTranslator;
+import org.apache.beam.runners.gearpump.translators.ParDoBoundTranslator;
+import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
+import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
+import org.apache.beam.runners.gearpump.translators.TransformTranslator;
+import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PValue;
+
+import org.apache.gearpump.util.Graph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects
+ * into Gearpump {@link Graph}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      GearpumpPipelineTranslator.class);
+
+  /**
+   * A map from {@link PTransform} subclass to the corresponding
+   * {@link TransformTranslator} to use to translate that transform.
+   */
+  private static final Map<Class<? extends PTransform>, TransformTranslator>
+      transformTranslators = new HashMap<>();
+
+  private final TranslationContext translationContext;
+
+  static {
+    // register TransformTranslators
+    registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+    registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
+    registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
+    registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
+    registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+        new FlattenPCollectionTranslator());
+    registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
+    registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+  }
+
+  public GearpumpPipelineTranslator(TranslationContext translationContext) {
+    this.translationContext = translationContext;
+  }
+
+  public void translate(Pipeline pipeline) {
+    pipeline.traverseTopologically(this);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+    LOG.debug("entering composite transform {}", node.getTransform());
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformTreeNode node) {
+    LOG.debug("leaving composite transform {}", node.getTransform());
+  }
+
+  @Override
+  public void visitPrimitiveTransform(TransformTreeNode node) {
+    LOG.debug("visiting transform {}", node.getTransform());
+    PTransform transform = node.getTransform();
+    TransformTranslator translator = getTransformTranslator(transform.getClass());
+    if (null == translator) {
+      throw new IllegalStateException(
+          "no translator registered for " + transform);
+    }
+    translationContext.setCurrentTransform(node);
+    translator.translate(transform, translationContext);
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformTreeNode producer) {
+    LOG.debug("visiting value {}", value);
+  }
+
+  /**
+   * Records that instances of the specified PTransform class
+   * should be translated by default by the corresponding
+   * {@link TransformTranslator}.
+   */
+  private static <TransformT extends PTransform> void registerTransformTranslator(
+      Class<TransformT> transformClass,
+      TransformTranslator<? extends TransformT> transformTranslator) {
+    if (transformTranslators.put(transformClass, transformTranslator) != null) {
+      throw new IllegalArgumentException(
+          "defining multiple translators for " + transformClass);
+    }
+  }
+
+  /**
+   * Returns the {@link TransformTranslator} to use for instances of the
+   * specified PTransform class, or null if none registered.
+   */
+  private <TransformT extends PTransform>
+  TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
+    return transformTranslators.get(transformClass);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
new file mode 100644
index 0000000..cedd31f
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+
+/**
+ * Gearpump {@link PipelineRunner} for tests, which uses {@link EmbeddedCluster}.
+ */
+public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
+
+  private final GearpumpPipelineRunner delegate;
+  private final EmbeddedCluster cluster;
+
+  private TestGearpumpRunner(GearpumpPipelineOptions options) {
+    cluster = EmbeddedCluster.apply();
+    cluster.start();
+    options.setEmbeddedCluster(cluster);
+    delegate = GearpumpPipelineRunner.fromOptions(options);
+  }
+
+  public static TestGearpumpRunner fromOptions(PipelineOptions options) {
+    GearpumpPipelineOptions pipelineOptions =
+        PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options);
+    return new TestGearpumpRunner(pipelineOptions);
+  }
+
+  @Override
+  public GearpumpPipelineResult run(Pipeline pipeline) {
+    GearpumpPipelineResult result = delegate.run(pipeline);
+    cluster.stop();
+    return result;
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput>
+  OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
+    return delegate.apply(transform, input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
new file mode 100644
index 0000000..c51289d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.examples;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.GearpumpPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * streaming word count example on Gearpump runner.
+ */
+public class StreamingWordCount {
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class);
+
+    @Override
+    public void processElement(ProcessContext c) {
+      String row = c.element().getKey()
+          + " - " + c.element().getValue()
+          + " @ " + c.timestamp().toString();
+      LOG.debug("output {}", row);
+      c.output(row);
+    }
+  }
+
+
+  public static void main(String[] args) {
+    GearpumpPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+        .as(GearpumpPipelineOptions.class);
+    options.setApplicationName("StreamingWordCount");
+    options.setRunner(GearpumpPipelineRunner.class);
+    options.setParallelism(1);
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<KV<String, Long>> wordCounts =
+        p.apply(Read.from(new UnboundedTextSource()))
+            .apply(ParDo.of(new ExtractWordsFn()))
+            .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
+            .apply(Count.<String>perElement());
+
+    wordCounts.apply(ParDo.of(new FormatAsStringFn()));
+
+    p.run();
+
+    ClientContext clientContext = options.getClientContext();
+    clientContext.close();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
new file mode 100644
index 0000000..caf066c
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.examples;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * unbounded source that reads from text.
+ */
+public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
+
+  @Override
+  public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(
+      int desiredNumSplits, PipelineOptions options) throws Exception {
+    return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this);
+  }
+
+  @Override
+  public UnboundedReader<String> createReader(PipelineOptions options,
+      @Nullable CheckpointMark checkpointMark) {
+    return new UnboundedTextReader(this);
+  }
+
+  @Nullable
+  @Override
+  public Coder<CheckpointMark> getCheckpointMarkCoder() {
+    return null;
+  }
+
+  @Override
+  public void validate() {
+  }
+
+  @Override
+  public Coder<String> getDefaultOutputCoder() {
+    return StringUtf8Coder.of();
+  }
+
+  /**
+   * reads from text.
+   */
+  public static class UnboundedTextReader extends UnboundedReader<String> implements Serializable {
+
+    private static final long serialVersionUID = 7526472295622776147L;
+
+    private final UnboundedTextSource source;
+
+    private final String[] texts = new String[]{"foo foo foo bar bar", "foo foo bar bar bar"};
+    private long index = 0;
+
+    private String currentRecord;
+
+    private Instant currentTimestamp;
+
+    public UnboundedTextReader(UnboundedTextSource source) {
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      currentRecord = texts[0];
+      currentTimestamp = new Instant(0);
+      return true;
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      index++;
+      currentRecord = texts[(int) index % (texts.length)];
+      currentTimestamp = new Instant(index * 1000);
+
+      return true;
+    }
+
+    @Override
+    public byte[] getCurrentRecordId() throws NoSuchElementException {
+      return new byte[0];
+    }
+
+    @Override
+    public String getCurrent() throws NoSuchElementException {
+      return this.currentRecord;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return currentTimestamp;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return currentTimestamp;
+    }
+
+    @Override
+    public CheckpointMark getCheckpointMark() {
+      return null;
+    }
+
+    @Override
+    public UnboundedSource<String, ?> getCurrentSource() {
+      return this.source;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
new file mode 100644
index 0000000..452127a
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+/**
+ * Wraps elements from Create.Values into an {@link UnboundedSource}.
+ * mainly used for test
+ */
+public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
+
+  @Override
+  public void translate(Create.Values<T> transform, TranslationContext context) {
+    try {
+      UnboundedSourceWrapper<T, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
+          new ValuesSource<>(transform.getElements(),
+              transform.getDefaultOutputCoder(context.getInput(transform))),
+          context.getPipelineOptions());
+      JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(unboundedSourceWrapper);
+      context.setOutputStream(context.getOutput(transform), sourceStream);
+    } catch (CannotProvideCoderException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
new file mode 100644
index 0000000..b06d5a8
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+/**
+ * Flatten.FlattenPCollectionList is translated to Gearpump merge function.
+ * Note only two-way merge is working now
+ */
+public class FlattenPCollectionTranslator<T> implements
+    TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+
+  @Override
+  public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+    JavaStream<T> merged = null;
+    System.out.println("PCollectionList size " + context.getInput(transform).size());
+    for (PCollection<T> collection : context.getInput(transform).getAll()) {
+      JavaStream<T> inputStream = context.getInputStream(collection);
+      if (null == merged) {
+        merged = inputStream;
+      } else {
+        merged = merged.merge(inputStream, transform.getName());
+      }
+    }
+    context.setOutputStream(context.getOutput(transform), merged);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
new file mode 100644
index 0000000..f36b908
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * {@link GroupByKey} is translated to Gearpump groupBy function.
+ */
+public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
+  @Override
+  public void translate(GroupByKey<K, V> transform, TranslationContext context) {
+    JavaStream<WindowedValue<KV<K, V>>> inputStream =
+        context.getInputStream(context.getInput(transform));
+    int parallelism = context.getPipelineOptions().getParallelism();
+    JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
+        .flatMap(new KeyedByKeyAndWindow<K, V>(), "keyed_by_Key_and_Window")
+        .groupBy(new GroupByKeyAndWindow<K, V>(), parallelism, "group_by_Key_and_Window")
+        .map(new ExtractKeyValue<K, V>(), "extract_Key_and_Value")
+        .reduce(new MergeValue<K, V>(), "merge_value");
+
+    context.setOutputStream(context.getOutput(transform), outputStream);
+  }
+
+  private static class KeyedByKeyAndWindow<K, V> implements
+      FlatMapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<KV<K, BoundedWindow>, V>>> {
+
+    @Override
+    public Iterator<WindowedValue<KV<KV<K, BoundedWindow>, V>>> apply(WindowedValue<KV<K, V>> wv) {
+      List<WindowedValue<KV<KV<K, BoundedWindow>, V>>> ret = new ArrayList<>(wv.getWindows().size
+          ());
+      for (BoundedWindow window : wv.getWindows()) {
+        KV<K, BoundedWindow> keyWin = KV.of(wv.getValue().getKey(), window);
+        ret.add(WindowedValue.of(KV.of(keyWin, wv.getValue().getValue()),
+            wv.getTimestamp(), window, wv.getPane()));
+      }
+      return ret.iterator();
+    }
+  }
+
+  private static class GroupByKeyAndWindow<K, V> implements
+      GroupByFunction<WindowedValue<KV<KV<K, BoundedWindow>, V>>, KV<K, BoundedWindow>> {
+
+    @Override
+    public KV<K, BoundedWindow> apply(WindowedValue<KV<KV<K, BoundedWindow>, V>> wv) {
+      return wv.getValue().getKey();
+    }
+  }
+
+  private static class ExtractKeyValue<K, V> implements
+      MapFunction<WindowedValue<KV<KV<K, BoundedWindow>, V>>,
+          WindowedValue<KV<K, Iterable<V>>>> {
+    @Override
+    public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<KV<K, BoundedWindow>, V>> wv) {
+      return WindowedValue.of(KV.of(wv.getValue().getKey().getKey(),
+              (Iterable<V>) Collections.singletonList(wv.getValue().getValue())),
+          wv.getTimestamp(), wv.getWindows(), wv.getPane());
+    }
+  }
+
+  private static class MergeValue<K, V> implements
+      ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
+    @Override
+    public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, Iterable<V>>> wv1,
+        WindowedValue<KV<K, Iterable<V>>> wv2) {
+      return WindowedValue.of(KV.of(wv1.getValue().getKey(),
+              Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())),
+          wv1.getTimestamp(), wv1.getWindows(), wv1.getPane());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
new file mode 100644
index 0000000..af5bcbc
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
+ * with {@link DoFn} wrapped in {@link DoFnMultiFunction}. The outputs are
+ * further filtered with Gearpump filter function by output tag
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ParDoBoundMultiTranslator<InputT, OutputT> implements
+    TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
+
+  @Override
+  public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
+    PCollection<InputT> inputT = (PCollection<InputT>) context.getInput(transform);
+    JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(inputT);
+    Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+
+    JavaStream<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputStream = inputStream.flatMap(
+        new DoFnMultiFunction<>(
+            context.getPipelineOptions(),
+            transform.getFn(),
+            transform.getMainOutputTag(),
+            transform.getSideOutputTags(),
+            inputT.getWindowingStrategy(),
+            new NoOpSideInputReader()
+        ), transform.getName());
+    for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
+      JavaStream<WindowedValue<OutputT>> taggedStream = outputStream
+          .filter(new FilterByOutputTag<>((TupleTag<OutputT>) output.getKey())
+              , "filter_by_output_tag")
+          .map(new ExtractOutput<OutputT>(), "extract output");
+
+      context.setOutputStream(output.getValue(), taggedStream);
+    }
+  }
+
+  /**
+   * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}.
+   */
+  private static class DoFnMultiFunction<InputT, OutputT> implements
+      FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>,
+      DoFnRunners.OutputManager {
+
+    private final DoFnRunner<InputT, OutputT> doFnRunner;
+    private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
+        .newArrayList();
+
+    public DoFnMultiFunction(
+        GearpumpPipelineOptions pipelineOptions,
+        DoFn<InputT, OutputT> doFn,
+        TupleTag<OutputT> mainOutputTag,
+        TupleTagList sideOutputTags,
+        WindowingStrategy<?, ?> windowingStrategy,
+        SideInputReader sideInputReader) {
+      this.doFnRunner = new GearpumpDoFnRunner<>(
+          pipelineOptions,
+          doFn,
+          sideInputReader,
+          this,
+          mainOutputTag,
+          sideOutputTags.getAll(),
+          new NoOpStepContext(),
+          windowingStrategy
+      );
+    }
+
+    @Override
+    public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
+      doFnRunner.startBundle();
+      doFnRunner.processElement(wv);
+      doFnRunner.finishBundle();
+
+      return outputs.iterator();
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      KV<TupleTag<OutputT>, OutputT> kv = KV.of((TupleTag<OutputT>) tag,
+          (OutputT) output.getValue());
+      outputs.add(WindowedValue.of(kv, output.getTimestamp(),
+          output.getWindows(), output.getPane()));
+    }
+  }
+
+  private static class FilterByOutputTag<OutputT> implements
+      FilterFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> {
+
+    private final TupleTag<OutputT> tupleTag;
+
+    public FilterByOutputTag(TupleTag<OutputT> tupleTag) {
+      this.tupleTag = tupleTag;
+    }
+
+    @Override
+    public boolean apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
+      return wv.getValue().getKey().equals(tupleTag);
+    }
+  }
+
+  private static class ExtractOutput<OutputT> implements
+      MapFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>, WindowedValue<OutputT>> {
+
+    @Override
+    public WindowedValue<OutputT> apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
+      return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(),
+          wv.getWindows(), wv.getPane());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
new file mode 100644
index 0000000..689bc08
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+
+/**
+ * {@link ParDo.Bound} is translated to Gearpump flatMap function
+ * with {@link DoFn} wrapped in {@link DoFnFunction}.
+ */
+public class ParDoBoundTranslator<InputT, OutputT> implements
+    TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+
+  @Override
+  public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
+    DoFn<InputT, OutputT> doFn = transform.getFn();
+    PCollection<OutputT> output = context.getOutput(transform);
+    WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
+
+    DoFnFunction<InputT, OutputT> doFnFunction = new DoFnFunction<>(context.getPipelineOptions(),
+        doFn, windowingStrategy, new NoOpSideInputReader());
+    JavaStream<WindowedValue<InputT>> inputStream =
+        context.getInputStream(context.getInput(transform));
+    JavaStream<WindowedValue<OutputT>> outputStream =
+        inputStream.flatMap(doFnFunction, transform.getName());
+
+    context.setOutputStream(context.getOutput(transform), outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
new file mode 100644
index 0000000..478d58f
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+
+/**
+ * {@link Read.Bounded} is translated to Gearpump source function
+ * and {@link BoundedSource} is wrapped into Gearpump {@link DataSource}.
+ */
+public class ReadBoundedTranslator <T> implements TransformTranslator<Read.Bounded<T>> {
+
+  @Override
+  public void translate(Read.Bounded<T> transform, TranslationContext context) {
+    BoundedSource<T> boundedSource = transform.getSource();
+    BoundedSourceWrapper<T> sourceWrapper = new BoundedSourceWrapper<>(boundedSource,
+        context.getPipelineOptions());
+    JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(sourceWrapper);
+
+    context.setOutputStream(context.getOutput(transform), sourceStream);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
new file mode 100644
index 0000000..7e12a9c
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+
+/**
+ * {@link Read.Unbounded} is translated to Gearpump source function
+ * and {@link UnboundedSource} is wrapped into Gearpump {@link DataSource}.
+ */
+
+public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
+
+  @Override
+  public void translate(Read.Unbounded<T> transform, TranslationContext context) {
+    UnboundedSource<T, ?> unboundedSource = transform.getSource();
+    UnboundedSourceWrapper<T, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
+        unboundedSource, context.getPipelineOptions());
+    JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(unboundedSourceWrapper);
+
+    context.setOutputStream(context.getOutput(transform), sourceStream);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
new file mode 100644
index 0000000..1ed6d5d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+import java.io.Serializable;
+
+/**
+ * translates {@link PTransform} to Gearpump functions.
+ */
+public interface TransformTranslator<T extends PTransform> extends Serializable {
+  void translate(T transform, TranslationContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
new file mode 100644
index 0000000..b9b2c7a
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.source.DataSource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Maintains context data for {@link TransformTranslator}s.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class TranslationContext {
+
+  private final JavaStreamApp streamApp;
+  private final GearpumpPipelineOptions pipelineOptions;
+  private AppliedPTransform<?, ?, ?> currentTransform;
+  private final Map<PValue, JavaStream<?>> streams = new HashMap<>();
+
+  public TranslationContext(JavaStreamApp streamApp, GearpumpPipelineOptions pipelineOptions) {
+    this.streamApp = streamApp;
+    this.pipelineOptions = pipelineOptions;
+
+  }
+
+  public void setCurrentTransform(TransformTreeNode treeNode) {
+    this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
+        treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
+  }
+
+  public GearpumpPipelineOptions getPipelineOptions() {
+    return pipelineOptions;
+  }
+
+  public <InputT> JavaStream<InputT> getInputStream(PValue input) {
+    return (JavaStream<InputT>) streams.get(input);
+  }
+
+  public <OutputT> void setOutputStream(PValue output, JavaStream<OutputT> outputStream) {
+    if (!streams.containsKey(output)) {
+      streams.put(output, outputStream);
+    }
+  }
+
+  public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
+    return (InputT) getCurrentTransform(transform).getInput();
+  }
+
+  public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
+    return (OutputT) getCurrentTransform(transform).getOutput();
+  }
+
+  private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) {
+    checkArgument(
+        currentTransform != null && currentTransform.getTransform() == transform,
+        "can only be called with current transform");
+    return currentTransform;
+  }
+
+  public <T> JavaStream<T> getSourceStream(DataSource dataSource) {
+    return streamApp.source(dataSource, pipelineOptions.getParallelism(),
+        UserConfig.empty(), "source");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
new file mode 100644
index 0000000..088fc14
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.functions;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.api.client.util.Lists;
+
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
+ */
+public class DoFnFunction<InputT, OutputT> implements
+    FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>>, DoFnRunners.OutputManager {
+
+  private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {
+  };
+  private final DoFnRunner<InputT, OutputT> doFnRunner;
+  private List<WindowedValue<OutputT>> outputs = Lists.newArrayList();
+
+  public DoFnFunction(
+      GearpumpPipelineOptions pipelineOptions,
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      SideInputReader sideInputReader) {
+    this.doFnRunner = new GearpumpDoFnRunner<>(
+        pipelineOptions,
+        doFn,
+        sideInputReader,
+        this,
+        mainTag,
+        TupleTagList.empty().getAll(),
+        new NoOpStepContext(),
+        windowingStrategy
+    );
+  }
+
+  @Override
+  public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) {
+    outputs = Lists.newArrayList();
+
+    doFnRunner.startBundle();
+    doFnRunner.processElement(value);
+    doFnRunner.finishBundle();
+
+    return outputs.iterator();
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+    if (mainTag.equals(tag)) {
+      outputs.add((WindowedValue<OutputT>) output);
+    } else {
+      throw new RuntimeException("output is not of main tag");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
new file mode 100644
index 0000000..f25d113
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.io;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.IOException;
+
+/**
+ * wrapper over BoundedSource for Gearpump DataSource API.
+ */
+public class BoundedSourceWrapper<T> extends GearpumpSource<T> {
+
+  private final BoundedSource<T> source;
+
+  public BoundedSourceWrapper(BoundedSource<T> source, PipelineOptions options) {
+    super(options);
+    this.source = source;
+  }
+
+
+  @Override
+  protected Source.Reader<T> createReader(PipelineOptions options) throws IOException {
+    return source.createReader(options);
+  }
+}