You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/20 16:11:29 UTC
[2/3] incubator-beam git commit: [BEAM-79] add Gearpump runner
[BEAM-79] add Gearpump runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9478f411
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9478f411
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9478f411
Branch: refs/heads/gearpump-runner
Commit: 9478f4117de3a2d0ea40614ed4cb801918610724
Parents: eb69e10
Author: manuzhang <ow...@gmail.com>
Authored: Tue Mar 15 16:15:16 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Jul 20 08:42:52 2016 +0800
----------------------------------------------------------------------
runners/gearpump/README.md | 22 +
runners/gearpump/pom.xml | 296 +++++++++++
.../gearpump/GearpumpPipelineOptions.java | 67 +++
.../gearpump/GearpumpPipelineResult.java | 42 ++
.../gearpump/GearpumpPipelineRunner.java | 193 +++++++
.../GearpumpPipelineRunnerRegistrar.java | 63 +++
.../gearpump/GearpumpPipelineTranslator.java | 139 +++++
.../runners/gearpump/TestGearpumpRunner.java | 64 +++
.../gearpump/examples/StreamingWordCount.java | 105 ++++
.../gearpump/examples/UnboundedTextSource.java | 138 +++++
.../translators/CreateValuesTranslator.java | 49 ++
.../FlattenPCollectionTranslator.java | 47 ++
.../translators/GroupByKeyTranslator.java | 103 ++++
.../translators/ParDoBoundMultiTranslator.java | 154 ++++++
.../translators/ParDoBoundTranslator.java | 54 ++
.../translators/ReadBoundedTranslator.java | 44 ++
.../translators/ReadUnboundedTranslator.java | 46 ++
.../translators/TransformTranslator.java | 31 ++
.../translators/TranslationContext.java | 95 ++++
.../translators/functions/DoFnFunction.java | 88 ++++
.../translators/io/BoundedSourceWrapper.java | 44 ++
.../gearpump/translators/io/GearpumpSource.java | 100 ++++
.../translators/io/UnboundedSourceWrapper.java | 45 ++
.../gearpump/translators/io/ValuesSource.java | 164 ++++++
.../translators/utils/GearpumpDoFnRunner.java | 513 +++++++++++++++++++
.../translators/utils/NoOpSideInputReader.java | 48 ++
.../translators/utils/NoOpStepContext.java | 71 +++
runners/pom.xml | 11 +
.../apache/beam/sdk/testing/TestPipeline.java | 2 +
29 files changed, 2838 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/README.md
----------------------------------------------------------------------
diff --git a/runners/gearpump/README.md b/runners/gearpump/README.md
new file mode 100644
index 0000000..ad043fa
--- /dev/null
+++ b/runners/gearpump/README.md
@@ -0,0 +1,22 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+## Gearpump Beam Runner
+
+The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
new file mode 100644
index 0000000..c725dae
--- /dev/null
+++ b/runners/gearpump/pom.xml
@@ -0,0 +1,296 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-parent</artifactId>
+ <version>0.2.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-runners-gearpump</artifactId>
+
+ <name>Apache Beam :: Runners :: Gearpump</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <gearpump.version>0.8.1-SNAPSHOT</gearpump.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ <repository>
+ <id>gearpump-shaded-repo</id>
+ <name>Vincent at Bintray</name>
+ <url>http://dl.bintray.com/fvunicorn/maven</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gearpump</groupId>
+ <artifactId>gearpump-streaming_2.11</artifactId>
+ <version>${gearpump.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gearpump</groupId>
+ <artifactId>gearpump-core_2.11</artifactId>
+ <version>${gearpump.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gearpump</groupId>
+ <artifactId>gearpump-daemon_2.11</artifactId>
+ <version>${gearpump.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.gearpump</groupId>
+ <artifactId>gearpump-experimental-cgroup_2.11</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ <scope>provided</scope>
+ <version>1.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.11.8</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.collections</groupId>
+ <artifactId>google-collections</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ <version>3.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>1.0-rc2</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- JAR Packaging -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+ <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+
+ <!-- Java compiler -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <!-- Integration Tests -->
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <forkCount>1</forkCount>
+ <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
+ </plugin>
+
+ <!-- Unit Tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <executions>
+ <execution>
+ <id>runnable-on-service-tests</id>
+ <configuration>
+ <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <parallel>none</parallel>
+ <failIfNoTests>true</failIfNoTests>
+ <dependenciesToScan>
+ <dependency>org.apache.beam:beam-sdks-java-core</dependency>
+ <dependency>org.apache.beam:beam-runners-java-core</dependency>
+ </dependenciesToScan>
+ <excludes>
+ <!-- side input is not supported in Gearpump -->
+ <exclude>
+ org.apache.beam.sdk.io.BigQueryIOTest,
+ org.apache.beam.sdk.io.CountingInputTest,
+ org.apache.beam.sdk.io.CountingSourceTest,
+ org.apache.beam.sdk.testing.PAssertTest,
+ org.apache.beam.sdk.transforms.ApproximateUniqueTest,
+ org.apache.beam.sdk.transforms.CombineTest,
+ org.apache.beam.sdk.transforms.CombineFnsTest,
+ org.apache.beam.sdk.transforms.CountTest,
+ org.apache.beam.sdk.transforms.FlattenTest,
+ org.apache.beam.sdk.transforms.ParDoTest,
+ org.apache.beam.sdk.transforms.SampleTest,
+ org.apache.beam.sdk.transforms.ViewTest,
+ org.apache.beam.sdk.transforms.join.CoGroupByKeyTest
+ </exclude>
+ <!-- merging windows is not supported in Gearpump -->
+ <exclude>
+ org.apache.beam.sdk.transforms.windowing.WindowingTest,
+ org.apache.beam.sdk.util.ReshuffleTest
+ </exclude>
+ </excludes>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ [
+ "--runner=TestGearpumpRunner",
+ "--streaming=true"
+ ]
+ </beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- uber jar -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
new file mode 100644
index 0000000..5b6ee96
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+
+import java.util.Map;
+
+/**
+ * Options that configure the Gearpump pipeline.
+ */
+public interface GearpumpPipelineOptions extends PipelineOptions {
+
+ @Description("set unique application name for Gearpump runner")
+ void setApplicationName(String name);
+
+ String getApplicationName();
+
+ @Description("set parallelism for Gearpump processor")
+ void setParallelism(int parallelism);
+
+ @Default.Integer(1)
+ int getParallelism();
+
+ @Description("register Kryo serializers")
+ void setSerializers(Map<String, String> serializers);
+
+ @JsonIgnore
+ Map<String, String> getSerializers();
+
+ @Description("set EmbeddedCluster for tests")
+ void setEmbeddedCluster(EmbeddedCluster cluster);
+
+ @JsonIgnore
+ EmbeddedCluster getEmbeddedCluster();
+
+ void setClientContext(ClientContext clientContext);
+
+ @JsonIgnore
+ @Description("get client context to query application status")
+ ClientContext getClientContext();
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
new file mode 100644
index 0000000..bc27147
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.transforms.Aggregator;
+
+/**
+ * Result of executing a {@link Pipeline} with Gearpump.
+ */
+public class GearpumpPipelineResult implements PipelineResult {
+ @Override
+ public State getState() {
+ return null;
+ }
+
+ @Override
+ public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
+ throws AggregatorRetrievalException {
+ throw new AggregatorRetrievalException(
+ "PipelineResult getAggregatorValues not supported in Gearpump pipeline",
+ new UnsupportedOperationException());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
new file mode 100644
index 0000000..660d703
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AssignWindows;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to Gearpump Stream DSL
+ * and then executing them on a Gearpump cluster.
+ * <p>>
+ * This is based on DataflowPipelineRunner.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResult> {
+
+ private final GearpumpPipelineOptions options;
+
+ private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
+ private static final String DEFAULT_APPNAME = "beam_gearpump_app";
+
+ public GearpumpPipelineRunner(GearpumpPipelineOptions options) {
+ this.options = options;
+ }
+
+ public static GearpumpPipelineRunner fromOptions(PipelineOptions options) {
+ GearpumpPipelineOptions pipelineOptions =
+ PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options);
+ return new GearpumpPipelineRunner(pipelineOptions);
+ }
+
+
+ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+ PTransform<InputT, OutputT> transform, InputT input) {
+ if (Window.Bound.class.equals(transform.getClass())) {
+ return (OutputT) super.apply(
+ new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
+ } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
+ && ((PCollectionList<?>) input).size() == 0) {
+ return (OutputT) Pipeline.applyTransform(input, Create.of());
+ } else if (Create.Values.class.equals(transform.getClass())) {
+ return (OutputT) PCollection
+ .<OutputT>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED);
+ } else {
+ return super.apply(transform, input);
+ }
+ }
+
+ @Override
+ public GearpumpPipelineResult run(Pipeline pipeline) {
+ String appName = options.getApplicationName();
+ if (null == appName) {
+ appName = DEFAULT_APPNAME;
+ }
+ Config config = registerSerializers(ClusterConfig.defaultConfig(),
+ options.getSerializers());
+ ClientContext clientContext = getClientContext(options, config);
+ options.setClientContext(clientContext);
+ JavaStreamApp streamApp = new JavaStreamApp(
+ appName, clientContext, UserConfig.empty());
+ TranslationContext translationContext = new TranslationContext(streamApp, options);
+ GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
+ translator.translate(pipeline);
+ streamApp.run();
+
+ return null;
+ }
+
+ private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
+ EmbeddedCluster cluster = options.getEmbeddedCluster();
+ if (cluster != null) {
+ return cluster.newClientContext();
+ } else {
+ return ClientContext.apply(config);
+ }
+ }
+
+ /**
+ * register class with default kryo serializers.
+ */
+ private Config registerSerializers(Config config, Map<String, String> userSerializers) {
+ Map<String, String> serializers = new HashMap<>();
+ serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", "");
+ serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", "");
+ serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", "");
+ serializers.put("org.joda.time.Instant", "");
+ serializers.put("org.apache.beam.sdk.values.KV", "");
+ serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", "");
+ serializers.put("org.apache.beam.sdk.values.TimestampedValue", "");
+ if (userSerializers != null && !userSerializers.isEmpty()) {
+ serializers.putAll(userSerializers);
+ }
+ return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
+ }
+
+
+ /**
+ * copied from DirectPipelineRunner.
+ * used to replace Window.Bound till window function is added to Gearpump Stream DSL
+ */
+ private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+
+ private final Window.Bound<T> wrapped;
+
+ public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ WindowingStrategy<?, ?> outputStrategy =
+ wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
+
+ WindowFn<T, BoundedWindow> windowFn =
+ (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+
+ if (!windowFn.isNonMerging()) {
+ throw new UnsupportedOperationException(
+ "merging window is not supported in Gearpump pipeline");
+ }
+
+ // If the Window.Bound transform only changed parts other than the WindowFn, then
+ // we skip AssignWindows even though it should be harmless in a perfect world.
+ // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
+ // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
+ // AssignWindows in this case.
+ if (wrapped.getWindowFn() == null) {
+ return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
+ .setWindowingStrategyInternal(outputStrategy);
+ } else {
+ return input
+ .apply("AssignWindows", new AssignWindows<>(windowFn))
+ .setWindowingStrategyInternal(outputStrategy);
+ }
+ }
+ }
+
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
new file mode 100644
index 0000000..2b9e89e
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
+ * {@link GearpumpPipelineRunner}.
+ *
+ * {@link AutoService} will register Gearpump's implementations of the {@link PipelineRunner}
+ * and {@link PipelineOptions} as available pipeline runner services.
+ */
+public class GearpumpPipelineRunnerRegistrar {
+ private GearpumpPipelineRunnerRegistrar() { }
+
+ /**
+ * Registers the {@link GearpumpPipelineRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+ TestGearpumpRunner.class);
+ }
+ }
+
+ /**
+ * Registers the {@link GearpumpPipelineOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(GearpumpPipelineOptions.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
new file mode 100644
index 0000000..59f0df7
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator;
+import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator;
+import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
+import org.apache.beam.runners.gearpump.translators.ParDoBoundMultiTranslator;
+import org.apache.beam.runners.gearpump.translators.ParDoBoundTranslator;
+import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
+import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
+import org.apache.beam.runners.gearpump.translators.TransformTranslator;
+import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PValue;
+
+import org.apache.gearpump.util.Graph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects
+ * into Gearpump {@link Graph}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GearpumpPipelineTranslator.class);
+
+ /**
+ * A map from {@link PTransform} subclass to the corresponding
+ * {@link TransformTranslator} to use to translate that transform.
+ */
+ private static final Map<Class<? extends PTransform>, TransformTranslator>
+ transformTranslators = new HashMap<>();
+
+ private final TranslationContext translationContext;
+
+ static {
+ // register TransformTranslators
+ registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+ registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
+ registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
+ registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
+ registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+ new FlattenPCollectionTranslator());
+ registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
+ registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+ }
+
+ public GearpumpPipelineTranslator(TranslationContext translationContext) {
+ this.translationContext = translationContext;
+ }
+
+ public void translate(Pipeline pipeline) {
+ pipeline.traverseTopologically(this);
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+ LOG.debug("entering composite transform {}", node.getTransform());
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformTreeNode node) {
+ LOG.debug("leaving composite transform {}", node.getTransform());
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformTreeNode node) {
+ LOG.debug("visiting transform {}", node.getTransform());
+ PTransform transform = node.getTransform();
+ TransformTranslator translator = getTransformTranslator(transform.getClass());
+ if (null == translator) {
+ throw new IllegalStateException(
+ "no translator registered for " + transform);
+ }
+ translationContext.setCurrentTransform(node);
+ translator.translate(transform, translationContext);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ LOG.debug("visiting value {}", value);
+ }
+
+ /**
+ * Records that instances of the specified PTransform class
+ * should be translated by default by the corresponding
+ * {@link TransformTranslator}.
+ */
+ private static <TransformT extends PTransform> void registerTransformTranslator(
+ Class<TransformT> transformClass,
+ TransformTranslator<? extends TransformT> transformTranslator) {
+ if (transformTranslators.put(transformClass, transformTranslator) != null) {
+ throw new IllegalArgumentException(
+ "defining multiple translators for " + transformClass);
+ }
+ }
+
+ /**
+ * Returns the {@link TransformTranslator} to use for instances of the
+ * specified PTransform class, or null if none registered.
+ */
+ private <TransformT extends PTransform>
+ TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
+ return transformTranslators.get(transformClass);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
new file mode 100644
index 0000000..cedd31f
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+
+/**
+ * Gearpump {@link PipelineRunner} for tests, which uses {@link EmbeddedCluster}.
+ */
+public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
+
+ private final GearpumpPipelineRunner delegate;
+ private final EmbeddedCluster cluster;
+
+ private TestGearpumpRunner(GearpumpPipelineOptions options) {
+ cluster = EmbeddedCluster.apply();
+ cluster.start();
+ options.setEmbeddedCluster(cluster);
+ delegate = GearpumpPipelineRunner.fromOptions(options);
+ }
+
+ public static TestGearpumpRunner fromOptions(PipelineOptions options) {
+ GearpumpPipelineOptions pipelineOptions =
+ PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options);
+ return new TestGearpumpRunner(pipelineOptions);
+ }
+
+ @Override
+ public GearpumpPipelineResult run(Pipeline pipeline) {
+ GearpumpPipelineResult result = delegate.run(pipeline);
+ cluster.stop();
+ return result;
+ }
+
+ @Override
+ public <OutputT extends POutput, InputT extends PInput>
+ OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
+ return delegate.apply(transform, input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
new file mode 100644
index 0000000..c51289d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.examples;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.GearpumpPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * streaming word count example on Gearpump runner.
+ */
+public class StreamingWordCount {
+
+ static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class);
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String row = c.element().getKey()
+ + " - " + c.element().getValue()
+ + " @ " + c.timestamp().toString();
+ LOG.debug("output {}", row);
+ c.output(row);
+ }
+ }
+
+
+ public static void main(String[] args) {
+ GearpumpPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+ .as(GearpumpPipelineOptions.class);
+ options.setApplicationName("StreamingWordCount");
+ options.setRunner(GearpumpPipelineRunner.class);
+ options.setParallelism(1);
+ Pipeline p = Pipeline.create(options);
+
+ PCollection<KV<String, Long>> wordCounts =
+ p.apply(Read.from(new UnboundedTextSource()))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
+ .apply(Count.<String>perElement());
+
+ wordCounts.apply(ParDo.of(new FormatAsStringFn()));
+
+ p.run();
+
+ ClientContext clientContext = options.getClientContext();
+ clientContext.close();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
new file mode 100644
index 0000000..caf066c
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.examples;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * unbounded source that reads from text.
+ */
+public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
+
+ @Override
+ public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this);
+ }
+
+ @Override
+ public UnboundedReader<String> createReader(PipelineOptions options,
+ @Nullable CheckpointMark checkpointMark) {
+ return new UnboundedTextReader(this);
+ }
+
+ @Nullable
+ @Override
+ public Coder<CheckpointMark> getCheckpointMarkCoder() {
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder<String> getDefaultOutputCoder() {
+ return StringUtf8Coder.of();
+ }
+
+ /**
+ * reads from text.
+ */
+ public static class UnboundedTextReader extends UnboundedReader<String> implements Serializable {
+
+ private static final long serialVersionUID = 7526472295622776147L;
+
+ private final UnboundedTextSource source;
+
+ private final String[] texts = new String[]{"foo foo foo bar bar", "foo foo bar bar bar"};
+ private long index = 0;
+
+ private String currentRecord;
+
+ private Instant currentTimestamp;
+
+ public UnboundedTextReader(UnboundedTextSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ currentRecord = texts[0];
+ currentTimestamp = new Instant(0);
+ return true;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ index++;
+ currentRecord = texts[(int) index % (texts.length)];
+ currentTimestamp = new Instant(index * 1000);
+
+ return true;
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ return new byte[0];
+ }
+
+ @Override
+ public String getCurrent() throws NoSuchElementException {
+ return this.currentRecord;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return currentTimestamp;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return currentTimestamp;
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource<String, ?> getCurrentSource() {
+ return this.source;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
new file mode 100644
index 0000000..452127a
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+/**
+ * Wraps elements from Create.Values into an {@link UnboundedSource}.
+ * mainly used for test
+ */
+public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
+
+ @Override
+ public void translate(Create.Values<T> transform, TranslationContext context) {
+ try {
+ UnboundedSourceWrapper<T, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
+ new ValuesSource<>(transform.getElements(),
+ transform.getDefaultOutputCoder(context.getInput(transform))),
+ context.getPipelineOptions());
+ JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(unboundedSourceWrapper);
+ context.setOutputStream(context.getOutput(transform), sourceStream);
+ } catch (CannotProvideCoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
new file mode 100644
index 0000000..b06d5a8
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+/**
+ * Flatten.FlattenPCollectionList is translated to Gearpump merge function.
+ * Note only two-way merge is working now
+ */
+public class FlattenPCollectionTranslator<T> implements
+ TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+
+ @Override
+ public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+ JavaStream<T> merged = null;
+ System.out.println("PCollectionList size " + context.getInput(transform).size());
+ for (PCollection<T> collection : context.getInput(transform).getAll()) {
+ JavaStream<T> inputStream = context.getInputStream(collection);
+ if (null == merged) {
+ merged = inputStream;
+ } else {
+ merged = merged.merge(inputStream, transform.getName());
+ }
+ }
+ context.setOutputStream(context.getOutput(transform), merged);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
new file mode 100644
index 0000000..f36b908
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * {@link GroupByKey} is translated to Gearpump groupBy function.
+ */
+public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
+ @Override
+ public void translate(GroupByKey<K, V> transform, TranslationContext context) {
+ JavaStream<WindowedValue<KV<K, V>>> inputStream =
+ context.getInputStream(context.getInput(transform));
+ int parallelism = context.getPipelineOptions().getParallelism();
+ JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
+ .flatMap(new KeyedByKeyAndWindow<K, V>(), "keyed_by_Key_and_Window")
+ .groupBy(new GroupByKeyAndWindow<K, V>(), parallelism, "group_by_Key_and_Window")
+ .map(new ExtractKeyValue<K, V>(), "extract_Key_and_Value")
+ .reduce(new MergeValue<K, V>(), "merge_value");
+
+ context.setOutputStream(context.getOutput(transform), outputStream);
+ }
+
+ private static class KeyedByKeyAndWindow<K, V> implements
+ FlatMapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<KV<K, BoundedWindow>, V>>> {
+
+ @Override
+ public Iterator<WindowedValue<KV<KV<K, BoundedWindow>, V>>> apply(WindowedValue<KV<K, V>> wv) {
+ List<WindowedValue<KV<KV<K, BoundedWindow>, V>>> ret = new ArrayList<>(wv.getWindows().size
+ ());
+ for (BoundedWindow window : wv.getWindows()) {
+ KV<K, BoundedWindow> keyWin = KV.of(wv.getValue().getKey(), window);
+ ret.add(WindowedValue.of(KV.of(keyWin, wv.getValue().getValue()),
+ wv.getTimestamp(), window, wv.getPane()));
+ }
+ return ret.iterator();
+ }
+ }
+
+ private static class GroupByKeyAndWindow<K, V> implements
+ GroupByFunction<WindowedValue<KV<KV<K, BoundedWindow>, V>>, KV<K, BoundedWindow>> {
+
+ @Override
+ public KV<K, BoundedWindow> apply(WindowedValue<KV<KV<K, BoundedWindow>, V>> wv) {
+ return wv.getValue().getKey();
+ }
+ }
+
+ private static class ExtractKeyValue<K, V> implements
+ MapFunction<WindowedValue<KV<KV<K, BoundedWindow>, V>>,
+ WindowedValue<KV<K, Iterable<V>>>> {
+ @Override
+ public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<KV<K, BoundedWindow>, V>> wv) {
+ return WindowedValue.of(KV.of(wv.getValue().getKey().getKey(),
+ (Iterable<V>) Collections.singletonList(wv.getValue().getValue())),
+ wv.getTimestamp(), wv.getWindows(), wv.getPane());
+ }
+ }
+
+ private static class MergeValue<K, V> implements
+ ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
+ @Override
+ public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, Iterable<V>>> wv1,
+ WindowedValue<KV<K, Iterable<V>>> wv2) {
+ return WindowedValue.of(KV.of(wv1.getValue().getKey(),
+ Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())),
+ wv1.getTimestamp(), wv1.getWindows(), wv1.getPane());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
new file mode 100644
index 0000000..af5bcbc
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
+ * with {@link DoFn} wrapped in {@link DoFnMultiFunction}. The outputs are
+ * further filtered with Gearpump filter function by output tag
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ParDoBoundMultiTranslator<InputT, OutputT> implements
+ TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
+
+ @Override
+ public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
+ PCollection<InputT> inputT = (PCollection<InputT>) context.getInput(transform);
+ JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(inputT);
+ Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+
+ JavaStream<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputStream = inputStream.flatMap(
+ new DoFnMultiFunction<>(
+ context.getPipelineOptions(),
+ transform.getFn(),
+ transform.getMainOutputTag(),
+ transform.getSideOutputTags(),
+ inputT.getWindowingStrategy(),
+ new NoOpSideInputReader()
+ ), transform.getName());
+ for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
+ JavaStream<WindowedValue<OutputT>> taggedStream = outputStream
+ .filter(new FilterByOutputTag<>((TupleTag<OutputT>) output.getKey())
+ , "filter_by_output_tag")
+ .map(new ExtractOutput<OutputT>(), "extract output");
+
+ context.setOutputStream(output.getValue(), taggedStream);
+ }
+ }
+
+ /**
+ * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}.
+ */
+ private static class DoFnMultiFunction<InputT, OutputT> implements
+ FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>,
+ DoFnRunners.OutputManager {
+
+ private final DoFnRunner<InputT, OutputT> doFnRunner;
+ private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
+ .newArrayList();
+
+ public DoFnMultiFunction(
+ GearpumpPipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ TupleTag<OutputT> mainOutputTag,
+ TupleTagList sideOutputTags,
+ WindowingStrategy<?, ?> windowingStrategy,
+ SideInputReader sideInputReader) {
+ this.doFnRunner = new GearpumpDoFnRunner<>(
+ pipelineOptions,
+ doFn,
+ sideInputReader,
+ this,
+ mainOutputTag,
+ sideOutputTags.getAll(),
+ new NoOpStepContext(),
+ windowingStrategy
+ );
+ }
+
+ @Override
+ public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
+ doFnRunner.startBundle();
+ doFnRunner.processElement(wv);
+ doFnRunner.finishBundle();
+
+ return outputs.iterator();
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ KV<TupleTag<OutputT>, OutputT> kv = KV.of((TupleTag<OutputT>) tag,
+ (OutputT) output.getValue());
+ outputs.add(WindowedValue.of(kv, output.getTimestamp(),
+ output.getWindows(), output.getPane()));
+ }
+ }
+
+ private static class FilterByOutputTag<OutputT> implements
+ FilterFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> {
+
+ private final TupleTag<OutputT> tupleTag;
+
+ public FilterByOutputTag(TupleTag<OutputT> tupleTag) {
+ this.tupleTag = tupleTag;
+ }
+
+ @Override
+ public boolean apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
+ return wv.getValue().getKey().equals(tupleTag);
+ }
+ }
+
+ private static class ExtractOutput<OutputT> implements
+ MapFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>, WindowedValue<OutputT>> {
+
+ @Override
+ public WindowedValue<OutputT> apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
+ return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(),
+ wv.getWindows(), wv.getPane());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
new file mode 100644
index 0000000..689bc08
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+
+/**
+ * {@link ParDo.Bound} is translated to Gearpump flatMap function
+ * with {@link DoFn} wrapped in {@link DoFnFunction}.
+ */
+public class ParDoBoundTranslator<InputT, OutputT> implements
+ TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+
+ @Override
+ public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getFn();
+ PCollection<OutputT> output = context.getOutput(transform);
+ WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
+
+ DoFnFunction<InputT, OutputT> doFnFunction = new DoFnFunction<>(context.getPipelineOptions(),
+ doFn, windowingStrategy, new NoOpSideInputReader());
+ JavaStream<WindowedValue<InputT>> inputStream =
+ context.getInputStream(context.getInput(transform));
+ JavaStream<WindowedValue<OutputT>> outputStream =
+ inputStream.flatMap(doFnFunction, transform.getName());
+
+ context.setOutputStream(context.getOutput(transform), outputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
new file mode 100644
index 0000000..478d58f
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+
+/**
+ * {@link Read.Bounded} is translated to Gearpump source function
+ * and {@link BoundedSource} is wrapped into Gearpump {@link DataSource}.
+ */
+public class ReadBoundedTranslator <T> implements TransformTranslator<Read.Bounded<T>> {
+
+ @Override
+ public void translate(Read.Bounded<T> transform, TranslationContext context) {
+ BoundedSource<T> boundedSource = transform.getSource();
+ BoundedSourceWrapper<T> sourceWrapper = new BoundedSourceWrapper<>(boundedSource,
+ context.getPipelineOptions());
+ JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(sourceWrapper);
+
+ context.setOutputStream(context.getOutput(transform), sourceStream);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
new file mode 100644
index 0000000..7e12a9c
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+
+/**
+ * {@link Read.Unbounded} is translated to Gearpump source function
+ * and {@link UnboundedSource} is wrapped into Gearpump {@link DataSource}.
+ */
+
+public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
+
+ @Override
+ public void translate(Read.Unbounded<T> transform, TranslationContext context) {
+ UnboundedSource<T, ?> unboundedSource = transform.getSource();
+ UnboundedSourceWrapper<T, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
+ unboundedSource, context.getPipelineOptions());
+ JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(unboundedSourceWrapper);
+
+ context.setOutputStream(context.getOutput(transform), sourceStream);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
new file mode 100644
index 0000000..1ed6d5d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+import java.io.Serializable;
+
+/**
+ * translates {@link PTransform} to Gearpump functions.
+ */
+public interface TransformTranslator<T extends PTransform> extends Serializable {
+ void translate(T transform, TranslationContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
new file mode 100644
index 0000000..b9b2c7a
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.source.DataSource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Maintains context data for {@link TransformTranslator}s.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class TranslationContext {
+
+ private final JavaStreamApp streamApp;
+ private final GearpumpPipelineOptions pipelineOptions;
+ private AppliedPTransform<?, ?, ?> currentTransform;
+ private final Map<PValue, JavaStream<?>> streams = new HashMap<>();
+
+ public TranslationContext(JavaStreamApp streamApp, GearpumpPipelineOptions pipelineOptions) {
+ this.streamApp = streamApp;
+ this.pipelineOptions = pipelineOptions;
+
+ }
+
+ public void setCurrentTransform(TransformTreeNode treeNode) {
+ this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
+ treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
+ }
+
+ public GearpumpPipelineOptions getPipelineOptions() {
+ return pipelineOptions;
+ }
+
+ public <InputT> JavaStream<InputT> getInputStream(PValue input) {
+ return (JavaStream<InputT>) streams.get(input);
+ }
+
+ public <OutputT> void setOutputStream(PValue output, JavaStream<OutputT> outputStream) {
+ if (!streams.containsKey(output)) {
+ streams.put(output, outputStream);
+ }
+ }
+
+ public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
+ return (InputT) getCurrentTransform(transform).getInput();
+ }
+
+ public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
+ return (OutputT) getCurrentTransform(transform).getOutput();
+ }
+
+ private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) {
+ checkArgument(
+ currentTransform != null && currentTransform.getTransform() == transform,
+ "can only be called with current transform");
+ return currentTransform;
+ }
+
+ public <T> JavaStream<T> getSourceStream(DataSource dataSource) {
+ return streamApp.source(dataSource, pipelineOptions.getParallelism(),
+ UserConfig.empty(), "source");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
new file mode 100644
index 0000000..088fc14
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.functions;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.api.client.util.Lists;
+
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
+ */
+public class DoFnFunction<InputT, OutputT> implements
+ FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>>, DoFnRunners.OutputManager {
+
+ private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {
+ };
+ private final DoFnRunner<InputT, OutputT> doFnRunner;
+ private List<WindowedValue<OutputT>> outputs = Lists.newArrayList();
+
+ public DoFnFunction(
+ GearpumpPipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ WindowingStrategy<?, ?> windowingStrategy,
+ SideInputReader sideInputReader) {
+ this.doFnRunner = new GearpumpDoFnRunner<>(
+ pipelineOptions,
+ doFn,
+ sideInputReader,
+ this,
+ mainTag,
+ TupleTagList.empty().getAll(),
+ new NoOpStepContext(),
+ windowingStrategy
+ );
+ }
+
+ @Override
+ public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) {
+ outputs = Lists.newArrayList();
+
+ doFnRunner.startBundle();
+ doFnRunner.processElement(value);
+ doFnRunner.finishBundle();
+
+ return outputs.iterator();
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ if (mainTag.equals(tag)) {
+ outputs.add((WindowedValue<OutputT>) output);
+ } else {
+ throw new RuntimeException("output is not of main tag");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9478f411/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
new file mode 100644
index 0000000..f25d113
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.io;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.IOException;
+
+/**
+ * wrapper over BoundedSource for Gearpump DataSource API.
+ */
+public class BoundedSourceWrapper<T> extends GearpumpSource<T> {
+
+ private final BoundedSource<T> source;
+
+ public BoundedSourceWrapper(BoundedSource<T> source, PipelineOptions options) {
+ super(options);
+ this.source = source;
+ }
+
+
+ @Override
+ protected Source.Reader<T> createReader(PipelineOptions options) throws IOException {
+ return source.createReader(options);
+ }
+}