You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/17 16:50:54 UTC
[02/11] incubator-beam git commit: BEAM-261 Apex runner PoC
BEAM-261 Apex runner PoC
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aaf38ddf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aaf38ddf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aaf38ddf
Branch: refs/heads/apex-runner
Commit: aaf38ddfe53bbb67fad4456ee1068d18b9b891b5
Parents: 49f9444
Author: Thomas Weise <th...@apache.org>
Authored: Mon Jun 27 11:24:13 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Oct 16 23:21:55 2016 -0700
----------------------------------------------------------------------
runners/apex/pom.xml | 226 ++++++++++
.../beam/runners/apex/ApexPipelineOptions.java | 60 +++
.../runners/apex/ApexPipelineTranslator.java | 134 ++++++
.../apache/beam/runners/apex/ApexRunner.java | 171 ++++++++
.../beam/runners/apex/ApexRunnerResult.java | 85 ++++
.../beam/runners/apex/TestApexRunner.java | 56 +++
.../translators/CreateValuesTranslator.java | 49 +++
.../FlattenPCollectionTranslator.java | 52 +++
.../apex/translators/GroupByKeyTranslator.java | 41 ++
.../apex/translators/ParDoBoundTranslator.java | 43 ++
.../translators/ReadUnboundedTranslator.java | 42 ++
.../apex/translators/TransformTranslator.java | 31 ++
.../apex/translators/TranslationContext.java | 143 +++++++
.../functions/ApexGroupByKeyOperator.java | 427 +++++++++++++++++++
.../functions/ApexParDoOperator.java | 177 ++++++++
.../io/ApexReadUnboundedInputOperator.java | 125 ++++++
.../apex/translators/io/ValuesSource.java | 152 +++++++
.../apex/translators/utils/ApexStreamTuple.java | 191 +++++++++
.../utils/CoderAdapterStreamCodec.java | 73 ++++
.../translators/utils/NoOpSideInputReader.java | 47 ++
.../apex/translators/utils/NoOpStepContext.java | 73 ++++
.../utils/SerializablePipelineOptions.java | 61 +++
.../apex/examples/StreamingWordCountTest.java | 120 ++++++
.../apex/examples/UnboundedTextSource.java | 144 +++++++
.../FlattenPCollectionTranslatorTest.java | 97 +++++
.../translators/GroupByKeyTranslatorTest.java | 248 +++++++++++
.../translators/ParDoBoundTranslatorTest.java | 164 +++++++
.../translators/ReadUnboundTranslatorTest.java | 130 ++++++
.../translators/utils/CollectionSource.java | 137 ++++++
.../translators/utils/PipelineOptionsTest.java | 82 ++++
.../apex/src/test/resources/log4j.properties | 33 ++
runners/pom.xml | 1 +
32 files changed, 3615 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
new file mode 100644
index 0000000..bb08b3c
--- /dev/null
+++ b/runners/apex/pom.xml
@@ -0,0 +1,226 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-parent</artifactId>
+ <version>0.3.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-runners-apex_3.4.0</artifactId>
+
+ <name>Apache Beam :: Runners :: Apex</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <apex.core.version>3.4.0</apex.core.version>
+ <apex.malhar.version>3.4.0</apex.malhar.version>
+ <skipIntegrationTests>true</skipIntegrationTests>
+ <!-- memory limit for embedded cluster -->
+ <surefire.args>-Xmx2048m</surefire.args>
+ </properties>
+
+ <dependencies>
+ <!-- Apex dependencies -->
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-common</artifactId>
+ <version>${apex.core.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${apex.malhar.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-engine</artifactId>
+ <version>${apex.core.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!--- Beam -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <!-- javax.annotation.Nullable -->
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <!-- Test scoped -->
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Depend on test jar to scan for RunnableOnService tests -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!--dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-examples-java</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency-->
+ <!-- Optional Pipeline Registration -->
+ <!--dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
+ </dependency-->
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- Checkstyle errors for now
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ -->
+
+ <!-- Integration Tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+
+ <!-- Unit Tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>${surefire.args}</argLine>
+ </configuration>
+ <executions>
+ <execution>
+ <id>runnable-on-service-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <parallel>none</parallel>
+ <failIfNoTests>true</failIfNoTests>
+ <dependenciesToScan>
+ <dependency>org.apache.beam:beam-sdks-java-core</dependency>
+ </dependenciesToScan>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ [
+ "--runner=org.apache.beam.runners.apex.TestApexRunner",
+ "--streaming=true"
+ ]
+ </beamTestPipelineOptions>
+ </systemPropertyVariables>
+ <skipTests>${skipIntegrationTests}</skipTests>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>analyze-only</goal>
+ </goals>
+ <configuration>
+ <ignoredUsedUndeclaredDependencies>
+ <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.4.0</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::2.24.0</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.2.1</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.2.0</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:19.0</ignoredUsedUndeclaredDependency>
+ </ignoredUsedUndeclaredDependencies>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
new file mode 100644
index 0000000..f70d24c
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options that configure the Apex pipeline.
+ */
+public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializable {
+
+ @Description("set unique application name for Apex runner")
+ void setApplicationName(String name);
+
+ String getApplicationName();
+
+ @Description("set parallelism for Apex runner")
+ void setParallelism(int parallelism);
+
+ @Default.Integer(1)
+ int getParallelism();
+
+ @Description("execute the pipeline with embedded cluster")
+ void setEmbeddedExecution(boolean embedded);
+
+ @Default.Boolean(true)
+ boolean isEmbeddedExecution();
+
+ @Description("configure embedded execution with debug friendly options")
+ void setEmbeddedExecutionDebugMode(boolean embeddedDebug);
+
+ @Default.Boolean(true)
+ boolean isEmbeddedExecutionDebugMode();
+
+ @Description("how long the client should wait for the pipeline to run")
+ void setRunMillis(long runMillis);
+
+ @Default.Long(0)
+ long getRunMillis();
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
new file mode 100644
index 0000000..8ea7139
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex;
+
+import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
+import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
+import org.apache.beam.runners.apex.translators.GroupByKeyTranslator;
+import org.apache.beam.runners.apex.translators.ParDoBoundTranslator;
+import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator;
+import org.apache.beam.runners.apex.translators.TransformTranslator;
+import org.apache.beam.runners.apex.translators.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link ApexPipelineTranslator} translates {@link Pipeline} objects
+ * into Apex logical plan {@link DAG}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ApexPipelineTranslator.class);
+
+ /**
+ * A map from {@link PTransform} subclass to the corresponding
+ * {@link TransformTranslator} to use to translate that transform.
+ */
+ private static final Map<Class<? extends PTransform>, TransformTranslator>
+ transformTranslators = new HashMap<>();
+
+ private final TranslationContext translationContext;
+
+ static {
+ // register TransformTranslators
+ registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+ registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
+ registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
+ registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+ new FlattenPCollectionTranslator());
+ registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+ }
+
+ public ApexPipelineTranslator(TranslationContext translationContext) {
+ this.translationContext = translationContext;
+ }
+
+ public void translate(Pipeline pipeline) {
+ pipeline.traverseTopologically(this);
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+ LOG.debug("entering composite transform {}", node.getTransform());
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformTreeNode node) {
+ LOG.debug("leaving composite transform {}", node.getTransform());
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformTreeNode node) {
+ LOG.debug("visiting transform {}", node.getTransform());
+ PTransform transform = node.getTransform();
+ TransformTranslator translator = getTransformTranslator(transform.getClass());
+ if (null == translator) {
+ throw new IllegalStateException(
+ "no translator registered for " + transform);
+ }
+ translationContext.setCurrentTransform(node);
+ translator.translate(transform, translationContext);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ LOG.debug("visiting value {}", value);
+ }
+
+ /**
+ * Records that instances of the specified PTransform class
+ * should be translated by default by the corresponding
+ * {@link TransformTranslator}.
+ */
+ private static <TransformT extends PTransform> void registerTransformTranslator(
+ Class<TransformT> transformClass,
+ TransformTranslator<? extends TransformT> transformTranslator) {
+ if (transformTranslators.put(transformClass, transformTranslator) != null) {
+ throw new IllegalArgumentException(
+ "defining multiple translators for " + transformClass);
+ }
+ }
+
+ /**
+ * Returns the {@link TransformTranslator} to use for instances of the
+ * specified PTransform class, or null if none registered.
+ */
+ private <TransformT extends PTransform<?,?>>
+ TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
+ return transformTranslators.get(transformClass);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
new file mode 100644
index 0000000..87c8f97
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.apex.translators.TranslationContext;
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AssignWindows;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.google.common.base.Throwables;
+
+/**
+ * A {@link PipelineRunner} that translates the
+ * pipeline to an Apex DAG and executes it on an Apex cluster.
+ * <p>
+ * Currently execution is always in embedded mode,
+ * launch on Hadoop cluster will be added in subsequent iteration.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
+
+ private final ApexPipelineOptions options;
+
+ public ApexRunner(ApexPipelineOptions options) {
+ this.options = options;
+ }
+
+ public static ApexRunner fromOptions(PipelineOptions options) {
+ return new ApexRunner((ApexPipelineOptions) options);
+ }
+
+ @Override
+ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+ PTransform<InputT, OutputT> transform, InputT input) {
+ if (Window.Bound.class.equals(transform.getClass())) {
+ return (OutputT) ((PCollection) input).apply(
+ new AssignWindowsAndSetStrategy((Window.Bound) transform));
+ } else if (Create.Values.class.equals(transform.getClass())) {
+ return (OutputT) PCollection
+ .<OutputT>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED);
+ } else if (Read.Bounded.class.equals(transform.getClass())) {
+ return (OutputT) ((PBegin) input).apply(new UnboundedReadFromBoundedSource<>(((Read.Bounded)transform).getSource()));
+ } else {
+ return super.apply(transform, input);
+ }
+ }
+
+ @Override
+ public ApexRunnerResult run(Pipeline pipeline) {
+
+ final TranslationContext translationContext = new TranslationContext(options);
+ ApexPipelineTranslator translator = new ApexPipelineTranslator(translationContext);
+ translator.translate(pipeline);
+
+ StreamingApplication apexApp = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName());
+ translationContext.populateDAG(dag);
+ }
+ };
+
+ checkArgument(options.isEmbeddedExecution(), "only embedded execution is supported at this time");
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ try {
+ lma.prepareDAG(apexApp, conf);
+ LocalMode.Controller lc = lma.getController();
+ if (options.isEmbeddedExecutionDebugMode()) {
+ // turns off timeout checking for operator progress
+ lc.setHeartbeatMonitoringEnabled(false);
+ }
+ if (options.getRunMillis() > 0) {
+ lc.run(options.getRunMillis());
+ } else {
+ lc.runAsync();
+ }
+ return new ApexRunnerResult(lma.getDAG(), lc);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * copied from DirectPipelineRunner.
+ * used to replace Window.Bound till equivalent function is added in Apex
+ */
+ private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+
+ private final Window.Bound<T> wrapped;
+
+ public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ WindowingStrategy<?, ?> outputStrategy =
+ wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
+
+ WindowFn<T, BoundedWindow> windowFn =
+ (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+
+ // If the Window.Bound transform only changed parts other than the WindowFn, then
+ // we skip AssignWindows even though it should be harmless in a perfect world.
+ // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
+ // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
+ // AssignWindows in this case.
+ if (wrapped.getWindowFn() == null) {
+ return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
+ .setWindowingStrategyInternal(outputStrategy);
+ } else {
+ return input
+ .apply("AssignWindows", new AssignWindows<>(windowFn))
+ .setWindowingStrategyInternal(outputStrategy);
+ }
+ }
+ }
+
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
new file mode 100644
index 0000000..f28c8dc
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.joda.time.Duration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Result of executing a {@link Pipeline} with Apex in embedded mode.
+ */
+public class ApexRunnerResult implements PipelineResult {
+ private final DAG apexDAG;
+ private final LocalMode.Controller ctrl;
+ private State state = State.UNKNOWN;
+
+ public ApexRunnerResult(DAG dag, LocalMode.Controller ctrl) {
+ this.apexDAG = dag;
+ this.ctrl = ctrl;
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
+ throws AggregatorRetrievalException {
+ return null;
+ }
+
+ @Override
+ public State cancel() throws IOException
+ {
+ ctrl.shutdown();
+ state = State.CANCELLED;
+ return state;
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration) throws IOException, InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public State waitUntilFinish() throws IOException, InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Return the DAG executed by the pipeline.
+ * @return
+ */
+ public DAG getApexDAG() {
+ return apexDAG;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
new file mode 100644
index 0000000..45c143e
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+
+public class TestApexRunner extends PipelineRunner<ApexRunnerResult> {
+
+ private ApexRunner delegate;
+
+ private TestApexRunner(ApexPipelineOptions options) {
+ options.setEmbeddedExecution(true);
+ //options.setEmbeddedExecutionDebugMode(false);
+ options.setRunMillis(20000);
+ this.delegate = ApexRunner.fromOptions(options);
+ }
+
+ public static TestApexRunner fromOptions(PipelineOptions options) {
+ ApexPipelineOptions apexOptions = PipelineOptionsValidator.validate(ApexPipelineOptions.class, options);
+ return new TestApexRunner(apexOptions);
+ }
+
+ @Override
+ public <OutputT extends POutput, InputT extends PInput>
+ OutputT apply(PTransform<InputT,OutputT> transform, InputT input) {
+ return delegate.apply(transform, input);
+ }
+
+ @Override
+ public ApexRunnerResult run(Pipeline pipeline) {
+ return delegate.run(pipeline);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
new file mode 100644
index 0000000..387b19f
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translators.io.ValuesSource;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.Create;
+
+import com.google.common.base.Throwables;
+
+
+/**
+ * Wraps elements from Create.Values into an {@link UnboundedSource}.
+ * mainly used for test
+ */
+public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
+ private static final long serialVersionUID = 1451000241832745629L;
+
+ @Override
+ public void translate(Create.Values<T> transform, TranslationContext context) {
+ try {
+ UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
+ transform.getDefaultOutputCoder(context.getInput()));
+ ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(unboundedSource,
+ context.getPipelineOptions());
+ context.addOperator(operator, operator.output);
+ } catch (CannotProvideCoderException e) {
+ Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
new file mode 100644
index 0000000..f228149
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+import com.datatorrent.lib.stream.StreamMerger;
+
+/**
+ * Flatten.FlattenPCollectionList translation to Apex operator.
+ * TODO: support more than two streams
+ */
+public class FlattenPCollectionTranslator<T> implements
+ TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+ StreamMerger<T> operator = null;
+ PCollectionList<T> collections = context.getInput();
+ if (collections.size() > 2) {
+ throw new UnsupportedOperationException("Currently supports only 2 collections: " + transform);
+ }
+ for (PCollection<T> collection : collections.getAll()) {
+ if (null == operator) {
+ operator = new StreamMerger<T>();
+ context.addStream(collection, operator.data1);
+ } else {
+ context.addStream(collection, operator.data2);
+ }
+ }
+ context.addOperator(operator, operator.out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
new file mode 100644
index 0000000..43c82a9
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.translators.functions.ApexGroupByKeyOperator;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link GroupByKey} translation to Apex operator.
+ */
+public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(GroupByKey<K, V> transform, TranslationContext context) {
+
+ PCollection<KV<K, V>> input = context.getInput();
+ ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(), input);
+ context.addOperator(group, group.output);
+ context.addStream(input, group.input);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
new file mode 100644
index 0000000..a958234
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
+import org.apache.beam.runners.apex.translators.utils.NoOpSideInputReader;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link ParDo.Bound} is translated to Apex operator that wraps the {@link DoFn}
+ */
+public class ParDoBoundTranslator<InputT, OutputT> implements
+ TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
+ OldDoFn<InputT, OutputT> doFn = transform.getFn();
+ PCollection<OutputT> output = context.getOutput();
+ ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
+ doFn, output.getWindowingStrategy(), new NoOpSideInputReader());
+ context.addOperator(operator, operator.output);
+ context.addStream(context.getInput(), operator.input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
new file mode 100644
index 0000000..b53e4dd
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+import com.datatorrent.api.InputOperator;
+
+/**
+ * {@link Read.Unbounded} is translated to Apex {@link InputOperator}
+ * that wraps {@link UnboundedSource}.
+ */
+public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(Read.Unbounded<T> transform, TranslationContext context) {
+ UnboundedSource<T, ?> unboundedSource = transform.getSource();
+ ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+ unboundedSource, context.getPipelineOptions());
+ context.addOperator(operator, operator.output);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
new file mode 100644
index 0000000..1a99885
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+import java.io.Serializable;
+
+/**
+ * translates {@link PTransform} to Apex functions.
+ */
+public interface TransformTranslator<T extends PTransform<?,?>> extends Serializable {
+ void translate(T transform, TranslationContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
new file mode 100644
index 0000000..92afd58
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translators.utils.CoderAdapterStreamCodec;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Operator.OutputPort;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Maintains context data for {@link TransformTranslator}s.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class TranslationContext {
+
+ private final ApexPipelineOptions pipelineOptions;
+ private AppliedPTransform<?, ?, ?> currentTransform;
+ private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>();
+ private final Map<String, Operator> operators = new HashMap<>();
+
+ public TranslationContext(ApexPipelineOptions pipelineOptions) {
+ this.pipelineOptions = pipelineOptions;
+ }
+
+ public void setCurrentTransform(TransformTreeNode treeNode) {
+ this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
+ treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
+ }
+
+ public ApexPipelineOptions getPipelineOptions() {
+ return pipelineOptions;
+ }
+
+ public <InputT extends PInput> InputT getInput() {
+ return (InputT) getCurrentTransform().getInput();
+ }
+
+ public <OutputT extends POutput> OutputT getOutput() {
+ return (OutputT) getCurrentTransform().getOutput();
+ }
+
+ private AppliedPTransform<?, ?, ?> getCurrentTransform() {
+ checkArgument(currentTransform != null, "current transform not set");
+ return currentTransform;
+ }
+
+ public void addOperator(Operator operator, OutputPort port) {
+ // Apex DAG requires a unique operator name
+ // use the transform's name and make it unique
+ String name = getCurrentTransform().getFullName();
+ for (int i=1; this.operators.containsKey(name); name = getCurrentTransform().getFullName() + i++);
+ this.operators.put(name, operator);
+ PCollection<?> output = getOutput();
+ this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>()));
+ }
+
+ /**
+ * Add operator that is internal to a transformation.
+ * @param output
+ * @param operator
+ * @param port
+ * @param name
+ */
+ public <T> PInput addInternalOperator(Operator operator, OutputPort port, String name, Coder<T> coder) {
+ checkArgument(this.operators.get(name) == null, "duplicate operator " + name);
+ this.operators.put(name, operator);
+ PCollection<T> input = getInput();
+ PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ output.setCoder(coder);
+ this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>()));
+ return output;
+ }
+
+ public void addStream(PInput input, InputPort inputPort) {
+ Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input);
+ checkArgument(stream != null, "no upstream operator defined");
+ stream.getRight().add(inputPort);
+ }
+
+ public void populateDAG(DAG dag) {
+ for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) {
+ dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue());
+ }
+ int streamIndex = 0;
+ for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.streams.entrySet()) {
+ List<InputPort<?>> sinksList = streamEntry.getValue().getRight();
+ InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]);
+ if (sinks.length > 0) {
+ dag.addStream("stream"+streamIndex++, streamEntry.getValue().getLeft(), sinks);
+ for (InputPort port : sinks) {
+ PCollection pc = streamEntry.getKey();
+ Coder coder = pc.getCoder();
+ if (pc.getWindowingStrategy() != null) {
+ coder = FullWindowedValueCoder.of(pc.getCoder(),
+ pc.getWindowingStrategy().getWindowFn().windowCoder()
+ );
+ }
+ Coder<Object> wrapperCoder = ApexStreamTuple.ApexStreamTupleCoder.of(coder);
+ CoderAdapterStreamCodec streamCodec = new CoderAdapterStreamCodec(wrapperCoder);
+ dag.setInputPortAttribute(port, PortContext.STREAM_CODEC, streamCodec);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
new file mode 100644
index 0000000..4608c92
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.functions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.SystemReduceFn;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Apex operator for Beam {@link GroupByKey}.
+ * This operator expects the input stream already partitioned by K,
+ * which is determined by the {@link StreamCodec} on the input port.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ApexGroupByKeyOperator<K, V> implements Operator
+{
+ @Bind(JavaSerializer.class)
+ private WindowingStrategy<V, BoundedWindow> windowingStrategy;
+ @Bind(JavaSerializer.class)
+ private Coder<V> valueCoder;
+
+ @Bind(JavaSerializer.class)
+ private final SerializablePipelineOptions serializedOptions;
+ @Bind(JavaSerializer.class)
+ private Map<K, StateInternals<K>> perKeyStateInternals = new HashMap<>();
+ private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+ private transient ProcessContext context;
+ private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
+ private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
+ private Instant inputWatermark = new Instant(0);
+
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>()
+ {
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t)
+ {
+ //System.out.println("\n***RECEIVED: " +t);
+ try {
+ if (t instanceof ApexStreamTuple.WatermarkTuple) {
+ ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>)t;
+ processWatermark(mark);
+ output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(mark.getTimestamp()));
+ return;
+ }
+ processElement(t.getValue());
+ } catch (Exception e) {
+ Throwables.propagate(e);
+ }
+ }
+ };
+
+ @OutputPortFieldAnnotation(optional=true)
+ public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>> output = new DefaultOutputPort<>();
+
+ @SuppressWarnings("unchecked")
+ public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input)
+ {
+ Preconditions.checkNotNull(pipelineOptions);
+ this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
+ this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>)input.getWindowingStrategy();
+ this.valueCoder = ((KvCoder<K, V>)input.getCoder()).getValueCoder();
+ }
+
+ @SuppressWarnings("unused") // for Kryo
+ private ApexGroupByKeyOperator()
+ {
+ this.serializedOptions = null;
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
+ this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, stateInternalsFactory,
+ SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
+ this.context = new ProcessContext(fn, this.timerInternals);
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ /**
+ * Returns the list of timers that are ready to fire. These are the timers
+ * that are registered to be triggered at a time before the current watermark.
+ * We keep these timers in a Set, so that they are deduplicated, as the same
+ * timer can be registered multiple times.
+ */
+ private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
+
+ // we keep the timers to return in a different list and launch them later
+ // because we cannot prevent a trigger from registering another trigger,
+ // which would lead to concurrent modification exception.
+ Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
+
+ Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+
+ Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
+ while (timerIt.hasNext()) {
+ TimerInternals.TimerData timerData = timerIt.next();
+ if (timerData.getTimestamp().isBefore(currentWatermark)) {
+ toFire.put(keyWithTimers.getKey(), timerData);
+ timerIt.remove();
+ }
+ }
+
+ if (keyWithTimers.getValue().isEmpty()) {
+ it.remove();
+ }
+ }
+ return toFire;
+ }
+
+ private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
+ final KV<K, V> kv = windowedValue.getValue();
+ final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(),
+ windowedValue.getTimestamp(),
+ windowedValue.getWindows(),
+ windowedValue.getPane());
+
+ KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
+ kv.getKey(),
+ Collections.singletonList(updatedWindowedValue));
+
+ context.setElement(kwi, getStateInternalsForKey(kwi.key()));
+ fn.processElement(context);
+ }
+
+ private StateInternals<K> getStateInternalsForKey(K key) {
+ StateInternals<K> stateInternals = perKeyStateInternals.get(key);
+ if (stateInternals == null) {
+ //Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
+ //OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn();
+ stateInternals = InMemoryStateInternals.forKey(key);
+ perKeyStateInternals.put(key, stateInternals);
+ }
+ return stateInternals;
+ }
+
+ private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
+ Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+ if (timersForKey == null) {
+ timersForKey = new HashSet<>();
+ }
+ timersForKey.add(timer);
+ activeTimers.put(key, timersForKey);
+ }
+
+ private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
+ Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+ if (timersForKey != null) {
+ timersForKey.remove(timer);
+ if (timersForKey.isEmpty()) {
+ activeTimers.remove(key);
+ } else {
+ activeTimers.put(key, timersForKey);
+ }
+ }
+ }
+
+ private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
+ this.inputWatermark = new Instant(mark.getTimestamp());
+ Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
+ if (!timers.isEmpty()) {
+ for (K key : timers.keySet()) {
+ KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(key));
+ context.setElement(kwi, getStateInternalsForKey(kwi.key()));
+ fn.processElement(context);
+ }
+ }
+ }
+
+ private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?, KeyedWorkItem<K, V>>.ProcessContext {
+
+ private final ApexTimerInternals timerInternals;
+ private StateInternals<K> stateInternals;
+ private KeyedWorkItem<K, V> element;
+
+ public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function,
+ ApexTimerInternals timerInternals) {
+ function.super();
+ this.timerInternals = Preconditions.checkNotNull(timerInternals);
+ }
+
+ public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) {
+ this.element = element;
+ this.stateInternals = stateForKey;
+ }
+
+ @Override
+ public KeyedWorkItem<K, V> element() {
+ return this.element;
+ }
+
+ @Override
+ public Instant timestamp() {
+ throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return serializedOptions.get();
+ }
+
+ @Override
+ public void output(KV<K, Iterable<V>> output) {
+ throw new UnsupportedOperationException(
+ "output() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) {
+ throw new UnsupportedOperationException(
+ "outputWithTimestamp() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public PaneInfo pane() {
+ throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new UnsupportedOperationException(
+ "window() is not available when processing KeyedWorkItems.");
+ }
+
+ @Override
+ public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() {
+ return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() {
+
+ @Override
+ public StateInternals<K> stateInternals() {
+ return stateInternals;
+ }
+
+ @Override
+ public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ System.out.println("\n***EMITTING: " + output + ", timestamp=" + timestamp);
+ ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return timerInternals;
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
+ }
+
+ @Override
+ public PaneInfo pane() {
+ throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
+ }
+
+ @Override
+ public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+ throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ throw new RuntimeException("sideInput() is not available in Streaming mode.");
+ }
+ };
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ // ignore the side output, this can happen when a user does not register
+ // side outputs but then outputs using a freshly created TupleTag.
+ throw new RuntimeException("sideOutput() is not available when grouping by window.");
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ sideOutput(tag, output);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * An implementation of Beam's {@link TimerInternals}.
+ *
+ */
+ public class ApexTimerInternals implements TimerInternals {
+
+ @Override
+ public void setTimer(TimerData timerKey)
+ {
+ registerActiveTimer(context.element().key(), timerKey);
+ }
+
+ @Override
+ public void deleteTimer(TimerData timerKey)
+ {
+ unregisterActiveTimer(context.element().key(), timerKey);
+ }
+
+ @Override
+ public Instant currentProcessingTime()
+ {
+ return Instant.now();
+ }
+
+ @Override
+ public Instant currentSynchronizedProcessingTime()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime()
+ {
+ return inputWatermark;
+ }
+
+ @Override
+ public Instant currentOutputWatermarkTime()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ }
+
+ private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable
+ {
+ @Override
+ public StateInternals<K> stateInternalsForKey(K key)
+ {
+ return getStateInternalsForKey(key);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
new file mode 100644
index 0000000..8005832
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators.functions;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translators.utils.NoOpStepContext;
+import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * Apex operator for Beam {@link DoFn}.
+ */
+public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager {
+
+ private transient final TupleTag<OutputT> mainTag = new TupleTag<OutputT>();
+ private transient DoFnRunner<InputT, OutputT> doFnRunner;
+
+ @Bind(JavaSerializer.class)
+ private final SerializablePipelineOptions pipelineOptions;
+ @Bind(JavaSerializer.class)
+ private final OldDoFn<InputT, OutputT> doFn;
+ @Bind(JavaSerializer.class)
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ @Bind(JavaSerializer.class)
+ private final SideInputReader sideInputReader;
+
+ public ApexParDoOperator(
+ ApexPipelineOptions pipelineOptions,
+ OldDoFn<InputT, OutputT> doFn,
+ WindowingStrategy<?, ?> windowingStrategy,
+ SideInputReader sideInputReader) {
+ this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
+ this.doFn = doFn;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputReader = sideInputReader;
+ }
+
+ @SuppressWarnings("unused") // for Kryo
+ private ApexParDoOperator() {
+ this(null, null, null, null);
+ }
+
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
+ {
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<InputT>> t)
+ {
+ if (t instanceof ApexStreamTuple.WatermarkTuple) {
+ output.emit(t);
+ } else {
+ System.out.println("\n" + Thread.currentThread().getName() + "\n" + t.getValue() + "\n");
+ doFnRunner.processElement(t.getValue());
+ }
+ }
+ };
+
+ @OutputPortFieldAnnotation(optional=true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>();
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple)
+ {
+ output.emit(ApexStreamTuple.DataTuple.of(tuple));
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ this.doFnRunner = DoFnRunners.simpleRunner(pipelineOptions.get(),
+ doFn,
+ sideInputReader,
+ this,
+ mainTag,
+ TupleTagList.empty().getAll(),
+ new NoOpStepContext(),
+ new NoOpAggregatorFactory(),
+ windowingStrategy
+ );
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ doFnRunner.startBundle();
+ /*
+ Collection<Aggregator<?, ?>> aggregators = AggregatorRetriever.getAggregators(doFn);
+ if (!aggregators.isEmpty()) {
+ System.out.println("\n" + Thread.currentThread().getName() + "\n" +AggregatorRetriever.getAggregators(doFn) + "\n");
+ }
+ */
+ }
+
+ @Override
+ public void endWindow()
+ {
+ doFnRunner.finishBundle();
+ }
+
+ /**
+ * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode.
+ * It is called from {@link org.apache.beam.sdk.util.SimpleDoFnRunner}.
+ */
+ public class NoOpAggregatorFactory implements AggregatorFactory {
+
+ private NoOpAggregatorFactory() {
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+ Class<?> fnClass, ExecutionContext.StepContext step,
+ String name, CombineFn<InputT, AccumT, OutputT> combine) {
+ return new Aggregator<InputT, OutputT>() {
+
+ @Override
+ public void addValue(InputT value)
+ {
+ }
+
+ @Override
+ public String getName()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public CombineFn<InputT, ?, OutputT> getCombineFn()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ };
+ }
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
new file mode 100644
index 0000000..39114fe
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators.io;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.DataTuple;
+import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.joda.time.Instant;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.google.common.base.Throwables;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import java.io.IOException;
+
+/**
+ * Apex input operator that wraps Beam UnboundedSource.
+ */
+public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+ implements InputOperator {
+
+ @Bind(JavaSerializer.class)
+ private final SerializablePipelineOptions pipelineOptions;
+ @Bind(JavaSerializer.class)
+ private final UnboundedSource<OutputT, CheckpointMarkT> source;
+ private transient UnboundedSource.UnboundedReader<OutputT> reader;
+ private transient boolean available = false;
+ public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = new DefaultOutputPort<>();
+
+ public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, ApexPipelineOptions options) {
+ this.pipelineOptions = new SerializablePipelineOptions(options);
+ this.source = source;
+ }
+
+ @SuppressWarnings("unused") // for Kryo
+ private ApexReadUnboundedInputOperator() {
+ this.pipelineOptions = null; this.source = null;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ Instant mark = reader.getWatermark();
+ output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark.getMillis()));
+ if (!available && source instanceof ValuesSource) {
+ // if it's a Create transformation and the input was consumed,
+ // terminate the stream (allows tests to finish faster)
+ BaseOperator.shutdown();
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ try {
+ reader = source.createReader(this.pipelineOptions.get(), null);
+ available = reader.start();
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ try {
+ if (!available) {
+ available = reader.advance();
+ }
+ if (available) {
+ OutputT data = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+ available = reader.advance();
+ output.emit(DataTuple.of(WindowedValue.of(
+ data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
+ }
+ } catch (Exception e) {
+ Throwables.propagate(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
new file mode 100644
index 0000000..2c4b298
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators.io;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+import com.google.common.base.Throwables;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * unbounded source that reads from a Java {@link Iterable}.
+ */
+public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
+ private static final long serialVersionUID = 1L;
+
+ private final byte[] codedValues;
+ private final IterableCoder<T> iterableCoder;
+
+ public ValuesSource(Iterable<T> values, Coder<T> coder) {
+ this.iterableCoder = IterableCoder.of(coder);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ iterableCoder.encode(values, bos, Context.OUTER);
+ } catch (IOException ex) {
+ Throwables.propagate(ex);
+ }
+ this.codedValues = bos.toByteArray();
+ }
+
+ @Override
+ public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public UnboundedReader<T> createReader(PipelineOptions options,
+ @Nullable CheckpointMark checkpointMark) {
+ ByteArrayInputStream bis = new ByteArrayInputStream(codedValues);
+ try {
+ Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER);
+ return new ValuesReader<>(values, this);
+ } catch (IOException ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ @Nullable
+ @Override
+ public Coder<CheckpointMark> getCheckpointMarkCoder() {
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return iterableCoder.getElemCoder();
+ }
+
+ private static class ValuesReader<T> extends UnboundedReader<T> {
+
+ private final Iterable<T> values;
+ private final UnboundedSource<T, CheckpointMark> source;
+ private transient Iterator<T> iterator;
+ private T current;
+
+ public ValuesReader(Iterable<T> values, UnboundedSource<T, CheckpointMark> source) {
+ this.values = values;
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ if (null == iterator) {
+ iterator = values.iterator();
+ }
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (iterator.hasNext()) {
+ current = iterator.next();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ return current;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource<T, ?> getCurrentSource() {
+ return source;
+ }
+ }
+}