You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:34:41 UTC
[2/3] beam git commit: [BEAM-2709] Add TezRunner
[BEAM-2709] Add TezRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8994f07e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8994f07e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8994f07e
Branch: refs/heads/tez-runner
Commit: 8994f07e052e1958909d59896bcc4eff39019f3a
Parents: f10399d
Author: Scheller <sc...@186590cf3d5d.ant.amazon.com>
Authored: Tue Jun 20 14:03:44 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Fri Nov 17 12:33:57 2017 -0800
----------------------------------------------------------------------
runners/pom.xml | 1 +
runners/tez/pom.xml | 135 +++++++++++++++
.../beam/runners/tez/TezPipelineOptions.java | 27 +++
.../org/apache/beam/runners/tez/TezRunner.java | 79 +++++++++
.../beam/runners/tez/TezRunnerResult.java | 78 +++++++++
.../FlattenPCollectionTranslator.java | 34 ++++
.../tez/translation/GroupByKeyTranslator.java | 43 +++++
.../tez/translation/ParDoTranslator.java | 92 ++++++++++
.../tez/translation/ReadBoundedTranslator.java | 43 +++++
.../tez/translation/TezDoFnProcessor.java | 131 +++++++++++++++
.../tez/translation/TezPipelineTranslator.java | 139 ++++++++++++++++
.../tez/translation/TransformTranslator.java | 28 ++++
.../tez/translation/TranslationContext.java | 166 +++++++++++++++++++
.../runners/tez/translation/TranslatorUtil.java | 147 ++++++++++++++++
.../ViewCreatePCollectionViewTranslator.java | 35 ++++
.../tez/translation/WindowAssignTranslator.java | 35 ++++
.../tez/translation/WriteFilesTranslator.java | 45 +++++
.../tez/translation/io/MROutputManager.java | 67 ++++++++
.../tez/translation/io/NoOpOutputManager.java | 35 ++++
.../io/OrderedPartitionedKVOutputManager.java | 62 +++++++
.../translation/io/OutputManagerFactory.java | 39 +++++
.../tez/translation/io/TezOutputManager.java | 62 +++++++
.../io/UnorderedKVEdgeOutputManager.java | 57 +++++++
.../apache/beam/runners/tez/TezRunnerTest.java | 155 +++++++++++++++++
.../tez/translation/ParDoTranslatorTest.java | 120 ++++++++++++++
.../tez/translation/TezDoFnProcessorTest.java | 112 +++++++++++++
.../tez/translation/TranslationContextTest.java | 110 ++++++++++++
.../tez/translation/TranslatorUtilTest.java | 43 +++++
runners/tez/src/test/resources/test_input.txt | 2 +
29 files changed, 2122 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 47f3c0e..2892fe8 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -65,6 +65,7 @@
</activation>
<modules>
<module>gearpump</module>
+ <module>tez</module>
</modules>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/pom.xml
----------------------------------------------------------------------
diff --git a/runners/tez/pom.xml b/runners/tez/pom.xml
new file mode 100644
index 0000000..b7d0d6d
--- /dev/null
+++ b/runners/tez/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+
+ </build>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-parent</artifactId>
+ <version>2.0.0</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-runners-tez</artifactId>
+ <version>2.0.0</version>
+
+ <name>Apache Beam :: Runners :: Tez</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <!-- Tez -->
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>0.8.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.7.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.7.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>0.8.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-mapreduce</artifactId>
+ <version>0.8.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ <version>0.8.4</version>
+ </dependency>
+
+ <!-- Beam -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-construction-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-harness</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java
new file mode 100644
index 0000000..8b37b09
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options that configure the Tez pipeline.
+ */
+public interface TezPipelineOptions extends PipelineOptions, java.io.Serializable {
+ //TODO: Add options to configure Tez
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java
new file mode 100644
index 0000000..7d32b47
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.tez.translation.TezPipelineTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PipelineRunner} that translates the
+ * pipeline to an Tez DAG and executes it on a Tez cluster.
+ *
+ */
+public class TezRunner extends PipelineRunner<TezRunnerResult>{
+
+ private static final Logger LOG = LoggerFactory.getLogger(TezClient.class);
+
+ private final TezPipelineOptions options;
+
+ private TezRunner(TezPipelineOptions options){
+ this.options = options;
+ }
+
+ public static TezRunner fromOptions(PipelineOptions options) {
+ TezPipelineOptions tezOptions = PipelineOptionsValidator.validate(TezPipelineOptions.class,options);
+ return new TezRunner(tezOptions);
+ }
+
+ @Override
+ public TezRunnerResult run(Pipeline pipeline) {
+ //Setup Tez Local Config
+ TezConfiguration config = new TezConfiguration();
+ config.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ config.set("fs.default.name", "file:///");
+ config.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+ config.set(TezConfiguration.TEZ_TASK_LOG_LEVEL, "DEBUG");
+ //TODO: Support Remote Tez Configuration
+
+ final TezPipelineTranslator translator = new TezPipelineTranslator(options, config);
+ final AtomicReference<DAG> tezDAG = new AtomicReference<>();
+ DAG dag = DAG.create(options.getJobName());
+ tezDAG.set(dag);
+ translator.translate(pipeline, dag);
+
+ TezClient client = TezClient.create("TezRun", config);
+ try {
+ client.start();
+ client.submitDAG(dag);
+ } catch (Exception e){
+ e.printStackTrace();
+ }
+
+ return new TezRunnerResult(client);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java
new file mode 100644
index 0000000..870c43c
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.tez.client.TezAppMasterStatus;
+import org.apache.tez.client.TezClient;
+import org.joda.time.Duration;
+
+/**
+ * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Tez.
+ */
+public class TezRunnerResult implements PipelineResult {
+
+ private final TezClient client;
+ private State state = State.UNKNOWN;
+
+ public TezRunnerResult(TezClient client){
+ this.client = client;
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public State waitUntilFinish() {
+ return waitUntilFinish(null);
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration) {
+ long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + duration.getMillis();
+ try {
+ while (client.getAppMasterStatus() != TezAppMasterStatus.SHUTDOWN && System.currentTimeMillis() < timeout) {
+ Thread.sleep(500);
+ }
+ if (!client.getAppMasterStatus().equals(TezAppMasterStatus.SHUTDOWN)){
+ return null;
+ }
+ return State.DONE;
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public State cancel() throws IOException {
+ //TODO: CODE TO CANCEL PIPELINE
+ return state;
+ }
+
+ @Override
+ public MetricResults metrics() {
+ throw new UnsupportedOperationException();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java
new file mode 100644
index 0000000..f1f5aee
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.Flatten;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Flatten} translation to Tez equivalent.
+ */
+class FlattenPCollectionTranslator<T> implements TransformTranslator<Flatten.PCollections<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslator.class);
+
+ @Override
+ public void translate(Flatten.PCollections<T> transform, TranslationContext context) {
+ //TODO: Translate transform to Tez and add to TranslationContext
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..8f95752
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link GroupByKey} translation to Tez {@link org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig}
+ */
+class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
+ private static final Logger LOG = LoggerFactory.getLogger(GroupByKey.class);
+
+ @Override
+ public void translate(GroupByKey<K, V> transform, TranslationContext context) {
+ if (context.getCurrentInputs().size() > 1 ){
+ throw new RuntimeException("Multiple Inputs are not yet supported");
+ } else if (context.getCurrentOutputs().size() > 1){
+ throw new RuntimeException("Multiple Outputs are not yet supported");
+ }
+ PValue input = Iterables.getOnlyElement(context.getCurrentInputs().values());
+ PValue output = Iterables.getOnlyElement(context.getCurrentOutputs().values());
+ context.addShufflePair(input, output);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java
new file mode 100644
index 0000000..9ce11e6
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import org.apache.beam.sdk.transforms.DoFn;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.beam.sdk.transforms.ParDo} translation to Tez {@link Vertex}.
+ */
+class ParDoTranslator<InputT, OutputT> implements TransformTranslator<MultiOutput<InputT, OutputT>> {
+ private static final Logger LOG = LoggerFactory.getLogger(ParDoTranslator.class);
+ private static final String OUTPUT_TAG = "OUTPUT_TAG";
+ private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE";
+
+ @Override
+ public void translate(MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+ //Prepare input/output targets
+ if (context.getCurrentInputs().size() > 1){
+ throw new NotImplementedException("Multiple Inputs are not yet supported");
+ } else if (context.getCurrentOutputs().size() > 1){
+ throw new NotImplementedException("Multiple Outputs are not yet supported");
+ }
+ PValue input = Iterables.getOnlyElement(context.getCurrentInputs().values());
+ PValue output = Iterables.getOnlyElement(context.getCurrentOutputs().values());
+
+ //Prepare UserPayload Configuration
+ DoFn doFn = transform.getFn();
+ String doFnInstance;
+ try {
+ doFnInstance = TranslatorUtil.toString(doFn);
+ } catch ( IOException e){
+ throw new RuntimeException("DoFn failed to serialize: " + e.getMessage());
+ }
+ Configuration config = new Configuration();
+ config.set(OUTPUT_TAG, transform.getMainOutputTag().getId());
+ config.set(DO_FN_INSTANCE_TAG, doFnInstance);
+
+ //Check for shuffle input
+ boolean shuffle = false;
+ for (Pair<PValue, PValue> pair : context.getShuffleSet()){
+ if (pair.getRight().equals(input)){
+ shuffle = true;
+ }
+ }
+
+ //Create Vertex with Payload
+ try {
+ UserPayload payload = TezUtils.createUserPayloadFromConf(config);
+ Vertex vertex;
+ if (shuffle) {
+ vertex = Vertex.create(context.getCurrentName(), ProcessorDescriptor.create(TezDoFnProcessor.class.getName()).setUserPayload(payload), 1);
+ //TODO: add customizable parallelism
+ } else {
+ vertex = Vertex.create(context.getCurrentName(), ProcessorDescriptor.create(TezDoFnProcessor.class.getName()).setUserPayload(payload));
+ }
+ context.addVertex(context.getCurrentName(), vertex, input, output);
+ } catch (Exception e){
+ throw new RuntimeException("Vertex Translation Failure from: " + e.getMessage());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java
new file mode 100644
index 0000000..3192a81
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Bounded} translation to Tez {@link DataSourceDescriptor}.
+ */
+class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(TransformTranslator.class);
+
+ @Override
+ public void translate(Bounded<T> transform, TranslationContext context) {
+ //Build datasource and add to datasource map
+ DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(context.getConfig()),
+ TextInputFormat.class, transform.getSource().toString()).build();
+ //TODO: Support Configurable Input Formats
+ context.getCurrentOutputs().forEach( (a, b) -> context.addSource(b, dataSource));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java
new file mode 100644
index 0000000..0fde90c
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import java.util.LinkedList;
+import org.apache.beam.fn.harness.fake.FakeStepContext;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.tez.translation.io.MROutputManager;
+import org.apache.beam.runners.tez.translation.io.NoOpOutputManager;
+import org.apache.beam.runners.tez.translation.io.OrderedPartitionedKVOutputManager;
+import org.apache.beam.runners.tez.translation.io.OutputManagerFactory;
+import org.apache.beam.runners.tez.translation.io.TezOutputManager;
+import org.apache.beam.runners.tez.translation.io.UnorderedKVEdgeOutputManager;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+/**
+ * TezDoFnProcessor is the Tez Wrapper to wrap user defined functions for Tez processing
+ * The DoFn is received through the {@link UserPayload} and then run using the simple {@link DoFnRunner}
+ */
+public class TezDoFnProcessor extends SimpleProcessor {
+
+ private static final String OUTPUT_TAG = "OUTPUT_TAG";
+ private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE";
+
+ private DoFn<?,?> theDoFn;
+ private String outputTag;
+
+ public TezDoFnProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ Configuration config = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ outputTag = config.get(OUTPUT_TAG, null);
+ String doFnInstance = config.get(DO_FN_INSTANCE_TAG, null);
+ theDoFn = (DoFn) TranslatorUtil.fromString(doFnInstance);
+ super.initialize();
+ }
+
+ @Override
+ public void run() throws Exception {
+ //Setup Reader
+ KeyValueReader kvReader = null;
+ KeyValuesReader kvsReader = null;
+ LogicalInput input = Iterables.getOnlyElement(getInputs().values());
+ Reader reader = input.getReader();
+ if (reader instanceof KeyValueReader) {
+ kvReader = (KeyValueReader) reader;
+ } else if (reader instanceof KeyValuesReader) {
+ kvsReader = (KeyValuesReader) reader;
+ } else {
+ throw new RuntimeException("UNSUPPORTED READER!");
+ }
+
+ //Setup Writer
+ TezOutputManager outputManager;
+ if (getOutputs().size() == 1){
+ LogicalOutput output = Iterables.getOnlyElement(getOutputs().values());
+ outputManager = OutputManagerFactory.createOutputManager(output);
+ outputManager.before();
+ } else if (getOutputs().size() == 0){
+ outputManager = new NoOpOutputManager();
+ } else {
+ throw new RuntimeException("Multiple outputs not yet supported");
+ }
+
+ //Initialize DoFnRunner
+ DoFnRunner runner = DoFnRunners.simpleRunner(PipelineOptionsFactory.create(), theDoFn, NullSideInputReader
+ .empty(), outputManager, new TupleTag<>(outputTag), new LinkedList<>(),
+ new FakeStepContext(), WindowingStrategy.globalDefault());
+ runner.startBundle();
+
+ //Start Runner
+ if (kvsReader != null){
+ while (kvsReader.next()){
+ outputManager.setCurrentElement(WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvsReader.getCurrentKey())));
+ runner.processElement(WindowedValue.valueInGlobalWindow(KV.of(TranslatorUtil.convertToJavaType(kvsReader.getCurrentKey()),
+ TranslatorUtil.convertIteratorToJavaType(kvsReader.getCurrentValues()))));
+ }
+ } else if (kvReader != null){
+ while (kvReader.next()){
+ WindowedValue value = WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvReader.getCurrentKey()));
+ outputManager.setCurrentElement(value);
+ runner.processElement(WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvReader.getCurrentValue())));
+ }
+ } else {
+ throw new RuntimeException("UNSUPPORTED READER!");
+ }
+
+ outputManager.after();
+ runner.finishBundle();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java
new file mode 100644
index 0000000..7b4646a
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TezPipelineTranslator} translates {@link Pipeline} objects
+ * into Tez logical plan {@link DAG}.
+ */
+public class TezPipelineTranslator implements Pipeline.PipelineVisitor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TezPipelineTranslator.class);
+
+ /**
+ * A map from {@link PTransform} subclass to the corresponding
+ * {@link TransformTranslator} to use to translate that transform.
+ */
+ private static final Map<Class<? extends PTransform>, TransformTranslator>
+ transformTranslators = new HashMap<>();
+
+ private static final Map<Class<? extends PTransform>, TransformTranslator>
+ compositeTransformTranslators = new HashMap<>();
+
+ private final TranslationContext translationContext;
+
+ static {
+ registerTransformTranslator(ParDo.MultiOutput.class, new ParDoTranslator<>());
+ registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator<>());
+ registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator<>());
+ registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator<>());
+ registerTransformTranslator(Flatten.PCollections.class, new FlattenPCollectionTranslator<>());
+ registerTransformTranslator(View.CreatePCollectionView.class, new ViewCreatePCollectionViewTranslator<>());
+ registerCompositeTransformTranslator(WriteFiles.class, new WriteFilesTranslator());
+ }
+
+ public TezPipelineTranslator(TezPipelineOptions options, TezConfiguration config){
+ translationContext = new TranslationContext(options, config);
+ }
+
+ public void translate(Pipeline pipeline, DAG dag) {
+ pipeline.traverseTopologically(this);
+ translationContext.populateDAG(dag);
+ }
+
+ /**
+ * Main visitor method called on each {@link PTransform} to transform them to Tez objects.
+ * @param node Pipeline node containing {@link PTransform} to be translated.
+ */
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ LOG.debug("visiting transform {}", node.getTransform());
+ PTransform transform = node.getTransform();
+ TransformTranslator translator = transformTranslators.get(transform.getClass());
+ if (translator == null) {
+ throw new UnsupportedOperationException(
+ "no translator registered for " + transform);
+ }
+ translationContext.setCurrentTransform(node);
+ translator.translate(transform, translationContext);
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ LOG.debug("entering composite transform {}", node.getTransform());
+ PTransform transform = node.getTransform();
+ if (transform != null){
+ TransformTranslator translator = compositeTransformTranslators.get(transform.getClass());
+ if (translator != null) {
+ translationContext.setCurrentTransform(node);
+ translator.translate(transform, translationContext);
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ }
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ LOG.debug("leaving composite transform {}", node.getTransform());
+ }
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ LOG.debug("visiting value {}", value);
+ }
+
+ /**
+ * Records that instances of the specified PTransform class
+ * should be translated by default by the corresponding
+ * {@link TransformTranslator}.
+ */
+ private static <TransformT extends PTransform> void registerTransformTranslator(
+ Class<TransformT> transformClass, TransformTranslator<? extends TransformT> transformTranslator) {
+ if (transformTranslators.put(transformClass, transformTranslator) != null) {
+ throw new IllegalArgumentException("defining multiple translators for " + transformClass);
+ }
+ }
+
+ private static <TransformT extends PTransform> void registerCompositeTransformTranslator(
+ Class<TransformT> transformClass, TransformTranslator<? extends TransformT> transformTranslator) {
+ if (compositeTransformTranslators.put(transformClass, transformTranslator) != null) {
+ throw new IllegalArgumentException("defining multiple translators for " + transformClass);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java
new file mode 100644
index 0000000..736c840
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Translates {@link PTransform} to Tez functions.
+ */
+interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable {
+ void translate(T transform, TranslationContext context);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java
new file mode 100644
index 0000000..1bffe95
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+
+/**
+ * Maintains context data for {@link TransformTranslator}s.
+ * Tracks and maintains each individual {@link Vertex} and their {@link Edge} connections.
+ */
+public class TranslationContext {
+
+ private final TezPipelineOptions pipelineOptions;
+ private final TezConfiguration config;
+
+ private AppliedPTransform<?, ?, ?> currentTransform;
+ private String currentName;
+ private Map<TupleTag<?>, PValue> currentInputs;
+ private Map<TupleTag<?>, PValue> currentOutputs;
+
+ private Map<String, Vertex> vertexMap = new HashMap<>();
+ private Map<PValue, Vertex> vertexInputMap = new HashMap<>();
+ private Map<PValue, Vertex> vertexOutputMap = new HashMap<>();
+
+ private Set<Pair<PValue, PValue>> shuffleSet = new HashSet<>();
+
+ private Map<PValue, DataSourceDescriptor> sourceMap = new HashMap<>();
+ private Map<PValue, DataSinkDescriptor> sinkMap = new HashMap<>();
+
+ public TranslationContext(TezPipelineOptions options, TezConfiguration config){
+ this.pipelineOptions = options;
+ this.config = config;
+ }
+
+ public void setCurrentTransform(TransformHierarchy.Node treeNode) {
+ this.currentTransform = treeNode.toAppliedPTransform();
+ this.currentInputs = treeNode.getInputs();
+ this.currentOutputs = treeNode.getOutputs();
+ this.currentName = treeNode.getFullName();
+ }
+
+ public void addVertex(String name, Vertex vertex, PValue input, PValue output) {
+ vertexMap.put(name, vertex);
+ vertexInputMap.put(input, vertex);
+ vertexOutputMap.put(output, vertex);
+ }
+
+ public void addShufflePair(PValue input, PValue output) {
+ shuffleSet.add(Pair.of(input, output));
+ }
+
+ public Set<Pair<PValue, PValue>> getShuffleSet(){
+ return this.shuffleSet;
+ }
+
+ public void addSource(PValue output, DataSourceDescriptor dataSource) {
+ sourceMap.put(output, dataSource);
+ }
+
+ public void addSink(PValue input, DataSinkDescriptor dataSink) {
+ sinkMap.put(input, dataSink);
+ }
+
+ public TezConfiguration getConfig() {
+ return config;
+ }
+
+ public AppliedPTransform<?, ?, ?> getCurrentTransform() {
+ return currentTransform;
+ }
+
+ public String getCurrentName() {
+ return currentName;
+ }
+
+ public Map<TupleTag<?>, PValue> getCurrentInputs() {
+ return currentInputs;
+ }
+
+ public Map<TupleTag<?>, PValue> getCurrentOutputs() {
+ return currentOutputs;
+ }
+
+ /**
+ * Populates the given Tez dag with the context's {@link Vertex} and {@link Edge}.
+ * @param dag Empty Tez dag to be populated.
+ */
+ public void populateDAG(DAG dag){
+
+ for (Vertex v : vertexMap.values()){
+ dag.addVertex(v);
+ }
+
+ //Add Sources
+ sourceMap.forEach( (value, source) -> {
+ Vertex vertex = vertexInputMap.get(value);
+ if (vertex != null){
+ vertex.addDataSource(value.getName(), source);
+ }
+ });
+
+ //Add Sinks
+ sinkMap.forEach( (value, source) -> {
+ Vertex vertex = vertexOutputMap.get(value);
+ if (vertex != null){
+ vertex.addDataSink(value.getName(), source);
+ }
+ });
+
+ //Add Shuffle Edges
+ for (Pair<PValue, PValue> pair : shuffleSet){
+ Vertex inputVertex = vertexOutputMap.get(pair.getLeft());
+ Vertex outputVertex = vertexInputMap.get(pair.getRight());
+ OrderedPartitionedKVEdgeConfig edgeConfig = OrderedPartitionedKVEdgeConfig.newBuilder(
+ BytesWritable.class.getName(), BytesWritable.class.getName(), HashPartitioner.class.getName()).build();
+ dag.addEdge(Edge.create(inputVertex, outputVertex, edgeConfig.createDefaultEdgeProperty()));
+ }
+
+ //Add Edges
+ vertexInputMap.forEach( (PValue inputValue, Vertex inputVertex) -> {
+ vertexOutputMap.forEach( (outputValue, outputVertex) -> {
+ if (inputValue.equals(outputValue)){
+ UnorderedKVEdgeConfig edgeConfig = UnorderedKVEdgeConfig.newBuilder(BytesWritable.class.getName(),
+ BytesWritable.class.getName()).build();
+ dag.addEdge(Edge.create(outputVertex, inputVertex, edgeConfig.createDefaultOneToOneEdgeProperty()));
+ }
+ });
+ });
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java
new file mode 100644
index 0000000..32b3ad0
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import org.apache.beam.sdk.transforms.DoFn;
+import java.util.List;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Translator Utilities to convert between hadoop and java types.
+ */
+public class TranslatorUtil {
+
+ /**
+ * Utility to convert java objects to bytes and place them in BytesWritable wrapper for hadoop use.
+ * @param element java object to be converted
+ * @return BytesWritable wrapped object
+ */
+ public static Object convertToBytesWritable(Object element) {
+ byte[] bytes;
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = new ObjectOutputStream(bos)) {
+ out.writeObject(element);
+ out.flush();
+ bytes = bos.toByteArray();
+ } catch (Exception e){
+ throw new RuntimeException("Failed to serialize object into byte array: " + e.getMessage());
+ }
+ if (bytes != null) {
+ return new BytesWritable(bytes);
+ } else {
+ throw new RuntimeException("Cannot convert null element to BytesWritable!");
+ }
+ }
+
+ /**
+ * Utility to convert hadoop objects back to their java equivalent.
+ * @param element hadoop object to be converted
+ * @return original java object
+ */
+ public static Object convertToJavaType(Object element) {
+ Object returnValue;
+ if (element instanceof BytesWritable){
+ BytesWritable myElement = (BytesWritable) element;
+ byte[] data = myElement.getBytes();
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
+ ObjectInput in = new ObjectInputStream(bis)) {
+ returnValue = in.readObject();
+ } catch (Exception e){
+ throw new RuntimeException("Failed to deserialize object from byte array: " + e.getMessage());
+ }
+ } else if (element instanceof Text) {
+ returnValue = element.toString();
+ } else if (element instanceof BooleanWritable) {
+ returnValue = ((BooleanWritable) element).get();
+ } else if (element instanceof IntWritable){
+ returnValue = ((IntWritable) element).get();
+ } else if (element instanceof DoubleWritable){
+ returnValue = ((DoubleWritable) element).get();
+ } else if (element instanceof FloatWritable){
+ returnValue = ((FloatWritable) element).get();
+ } else if (element instanceof LongWritable){
+ returnValue = ((LongWritable) element).get();
+ } else if (element instanceof ShortWritable){
+ returnValue = ((ShortWritable) element).get();
+ } else if (element instanceof ObjectWritable){
+ returnValue = ((ObjectWritable) element).get();
+ } else {
+ throw new RuntimeException("Hadoop Type " + element.getClass() + " cannot be converted to Java!");
+ }
+ return returnValue;
+ }
+
+ /**
+ * Utility to convert hadoop objects within an iterable back to their java equivalent.
+ * @param iterable Iterable containing objects to be converted
+ * @return new Iterable with original java objects
+ */
+ static Iterable<Object> convertIteratorToJavaType(Iterable<Object> iterable){
+ List<Object> list = new ArrayList<>();
+ iterable.iterator().forEachRemaining((Object element) -> list.add(convertToJavaType(element)));
+ return list;
+ }
+
+ /**
+ * Utility to serialize a serializable object into a string.
+ * @param object that is serializable to be serialized.
+ * @return serialized string
+ * @throws IOException thrown for serialization errors.
+ */
+ public static String toString( Serializable object ) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(object);
+ oos.close();
+ return Base64.getEncoder().encodeToString(baos.toByteArray());
+ }
+
+ /**
+ * Utility to deserialize a string into a serializable object.
+ * @param string containing serialized object.
+ * @return Original object
+ * @throws IOException thrown for serialization errors.
+ * @throws ClassNotFoundException thrown for serialization errors.
+ */
+ public static Object fromString( String string ) throws IOException, ClassNotFoundException {
+ byte [] data = Base64.getDecoder().decode(string);
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
+ Object object = ois.readObject();
+ ois.close();
+ return object;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java
new file mode 100644
index 0000000..3fbb296
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.View;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link View.CreatePCollectionView} translation to Tez equivalent.
+ */
+class ViewCreatePCollectionViewTranslator<ElemT, ViewT> implements
+ TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> {
+ private static final Logger LOG = LoggerFactory.getLogger(ViewCreatePCollectionViewTranslator.class);
+
+ @Override
+ public void translate(View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
+ //TODO: Translate transform to Tez and add to TranslationContext
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java
new file mode 100644
index 0000000..433e5a5
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Assign;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Assign} translation to Tez equivalent.
+ */
+class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(WindowAssignTranslator.class);
+
+ @Override
+ public void translate(Assign<T> transform, TranslationContext context) {
+ //TODO: Translate transform to Tez and add to TranslationContext
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java
new file mode 100644
index 0000000..312d8ae
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.mapreduce.output.MROutput;
+
+/**
+ * {@link MROutput} translation to Tez {@link DataSinkDescriptor}.
+ */
+class WriteFilesTranslator implements TransformTranslator<WriteFiles<?>> {
+
+ @Override
+ public void translate(WriteFiles transform, TranslationContext context) {
+ Pattern pattern = Pattern.compile(".*\\{.*\\{value=(.*)}}.*");
+ Matcher matcher = pattern.matcher(transform.getSink().getBaseOutputDirectoryProvider().toString());
+ if (matcher.matches()){
+ String output = matcher.group(1);
+ DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(context.getConfig()),
+ TextOutputFormat.class, output).build();
+
+ context.getCurrentInputs().forEach( (a, b) -> context.addSink(b, dataSink));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java
new file mode 100644
index 0000000..8305e1e
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import java.io.IOException;
+import org.apache.beam.runners.tez.translation.TranslatorUtil;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * {@link TezOutputManager} implementation that properly writes output to {@link MROutput}
+ */
+public class MROutputManager extends TezOutputManager {
+
+ private MROutput output;
+
+ public MROutputManager(LogicalOutput output) {
+ super(output);
+ if (output.getClass().equals(MROutput.class)){
+ this.output = (MROutput) output;
+ try {
+ setWriter((KeyValueWriter) output.getWriter());
+ } catch (Exception e) {
+ throw new RuntimeException("Error when retrieving writer for output" + e.getMessage());
+ }
+ } else {
+ throw new RuntimeException("Incorrect OutputManager for: " + output.getClass());
+ }
+ }
+
+ @Override
+ public void after() {
+ try {
+ output.flush();
+ output.commit();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ try {
+ getWriter().write(null, TranslatorUtil.convertToBytesWritable(output.getValue()));
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java
new file mode 100644
index 0000000..725be0a
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * {@link TezOutputManager} implementation for when the {@link org.apache.tez.dag.api.Vertex} has no output.
+ * Used in cases such as when the ParDo within the Vertex writes the output itself.
+ */
+public class NoOpOutputManager extends TezOutputManager {
+
+ public NoOpOutputManager() {
+ super(null);
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {}
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java
new file mode 100644
index 0000000..4a652da
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.runners.tez.translation.TranslatorUtil;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+
+/**
+ * {@link TezOutputManager} implementation that properly writes output to {@link OrderedPartitionedKVOutput}
+ */
+public class OrderedPartitionedKVOutputManager extends TezOutputManager {
+
+ private OrderedPartitionedKVOutput output;
+
+ public OrderedPartitionedKVOutputManager(LogicalOutput output) {
+ super(output);
+ if (output.getClass().equals(OrderedPartitionedKVOutput.class)){
+ this.output = (OrderedPartitionedKVOutput) output;
+ try {
+ setWriter((KeyValueWriter) output.getWriter());
+ } catch (Exception e) {
+ throw new RuntimeException("Error when retrieving writer for output" + e.getMessage());
+ }
+ } else {
+ throw new RuntimeException("Incorrect OutputManager for: " + output.getClass());
+ }
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ try {
+ if (output.getValue() instanceof KV) {
+ getWriter().write(TranslatorUtil.convertToBytesWritable(((KV) output.getValue()).getKey()),
+ TranslatorUtil.convertToBytesWritable(((KV) output.getValue()).getValue()));
+ } else {
+ throw new IllegalArgumentException("GroupByKey can only group Key-Value outputs!");
+ }
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java
new file mode 100644
index 0000000..807de3f
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+
+public class OutputManagerFactory {
+ public static TezOutputManager createOutputManager(LogicalOutput output){
+ TezOutputManager outputManager;
+ if (output.getClass().equals(OrderedPartitionedKVOutput.class)){
+ outputManager = new OrderedPartitionedKVOutputManager(output);
+ } else if (output.getClass().equals(UnorderedKVOutput.class)){
+ outputManager = new UnorderedKVEdgeOutputManager(output);
+ } else if (output.getClass().equals(MROutput.class)){
+ outputManager = new MROutputManager(output);
+ } else {
+ throw new RuntimeException("Output type: " + output.getClass() + " is unsupported");
+ }
+ return outputManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java
new file mode 100644
index 0000000..2999ee5
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * Abstract Output Manager that adds before and after methods to the {@link DoFnRunners.OutputManager}
+ * interface so that outputs that require them can be added and used with the TezRunner.
+ */
+public abstract class TezOutputManager implements DoFnRunners.OutputManager {
+
+ private WindowedValue currentElement;
+ private KeyValueWriter writer;
+ private LogicalOutput output;
+
+ public TezOutputManager(LogicalOutput output){
+ this.output = output;
+ }
+
+ public void before() {}
+
+ public void after() {}
+
+ public void setCurrentElement(WindowedValue currentElement) {
+ this.currentElement = currentElement;
+ }
+
+ public WindowedValue getCurrentElement(){
+ return currentElement;
+ }
+
+ public void setWriter(KeyValueWriter writer) {
+ this.writer = writer;
+ }
+
+ public KeyValueWriter getWriter() {
+ return writer;
+ }
+
+ public LogicalOutput getOutput() {
+ return output;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java
new file mode 100644
index 0000000..34cb371
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.runners.tez.translation.TranslatorUtil;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+
+/**
+ * {@link TezOutputManager} implementation that properly writes output to {@link UnorderedKVOutput}
+ */
+public class UnorderedKVEdgeOutputManager extends TezOutputManager {
+
+ private UnorderedKVOutput output;
+
+ public UnorderedKVEdgeOutputManager(LogicalOutput output) {
+ super(output);
+ if (output.getClass().equals(UnorderedKVOutput.class)){
+ this.output = (UnorderedKVOutput) output;
+ try {
+ setWriter((KeyValueWriter) output.getWriter());
+ } catch (Exception e) {
+ throw new RuntimeException("Error when retrieving writer for output" + e.getMessage());
+ }
+ } else {
+ throw new RuntimeException("Incorrect OutputManager for: " + output.getClass());
+ }
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ try {
+ getWriter().write(TranslatorUtil.convertToBytesWritable(getCurrentElement().getValue()),
+ TranslatorUtil.convertToBytesWritable(output.getValue()));
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java b/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java
new file mode 100644
index 0000000..66e9f19
--- /dev/null
+++ b/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the Tez runner.
+ */
+public class TezRunnerTest {
+
+ private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
+ private static final String INPUT_LOCATION = "src/test/resources/test_input.txt";
+
+ private static Pipeline tezPipeline;
+ private static Pipeline directPipeline;
+
+ @Before
+ public void setupPipelines(){
+ //TezRunner Pipeline
+ PipelineOptions tezOptions = PipelineOptionsFactory.create();
+ tezOptions.setRunner(TezRunner.class);
+ tezPipeline = Pipeline.create(tezOptions);
+
+ //DirectRunner Pipeline
+ PipelineOptions options = PipelineOptionsFactory.create();
+ directPipeline = Pipeline.create(options);
+ }
+
+ @Test
+ public void simpleTest() throws Exception {
+ tezPipeline.apply(TextIO.read().from(INPUT_LOCATION))
+ .apply(ParDo.of(new AddHelloWorld()))
+ .apply(ParDo.of(new TestTezFn()));
+
+ directPipeline.apply(TextIO.read().from(INPUT_LOCATION))
+ .apply(ParDo.of(new AddHelloWorld()))
+ .apply(ParDo.of(new TestDirectFn()));
+
+ tezPipeline.run().waitUntilFinish();
+ directPipeline.run().waitUntilFinish();
+ Assert.assertEquals(TestDirectFn.RESULTS, TestTezFn.RESULTS);
+ }
+
+ @Test
+ public void wordCountTest() throws Exception {
+ tezPipeline.apply("ONE", TextIO.read().from(INPUT_LOCATION))
+ .apply("TWO", ParDo.of(new TokenDoFn()))
+ .apply("THREE", GroupByKey.create())
+ .apply("FOUR", ParDo.of(new ProcessDoFn()))
+ .apply("FIVE", ParDo.of(new TestTezFn()));
+
+ directPipeline.apply("ONE", TextIO.read().from(INPUT_LOCATION))
+ .apply("TWO", ParDo.of(new TokenDoFn()))
+ .apply("THREE", GroupByKey.create())
+ .apply("FOUR", ParDo.of(new ProcessDoFn()))
+ .apply("FIVE", ParDo.of(new TestDirectFn()));
+
+ tezPipeline.run().waitUntilFinish();
+ directPipeline.run().waitUntilFinish();
+ Assert.assertEquals(TestDirectFn.RESULTS, TestTezFn.RESULTS);
+ }
+
+ private static class AddHelloWorld extends DoFn<String, String>{
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+
+ // Split the line into words.
+ String[] words = c.element().split(TOKENIZER_PATTERN);
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word + " HelloWorld");
+ }
+ }
+ }
+ }
+
+ public static class TokenDoFn extends DoFn<String, KV<String, Integer>>{
+ @ProcessElement
+ public void processElement(ProcessContext c){
+ for( String word : c.element().split(TOKENIZER_PATTERN)){
+ if(!word.isEmpty()){
+ c.output(KV.of(word, 1));
+ }
+ }
+ }
+ }
+
+ public static class ProcessDoFn extends DoFn<KV<String,Iterable<Integer>>, String>{
+ @ProcessElement
+ public void processElement(ProcessContext c){
+ Integer sum = 0;
+ for( Integer integer : c.element().getValue()){
+ sum = sum + integer;
+ }
+ c.output(c.element().getKey() + ": " + sum);
+ }
+ }
+
+ private static class TestTezFn extends DoFn<String, String> {
+ private static final Set<String> RESULTS = Collections.synchronizedSet(new HashSet<>());
+
+ public TestTezFn(){
+ RESULTS.clear();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ RESULTS.add(c.element());
+ }
+ }
+
+ private static class TestDirectFn extends DoFn<String, String> {
+ private static final Set<String> RESULTS = Collections.synchronizedSet(new HashSet<>());
+
+ public TestDirectFn(){
+ RESULTS.clear();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ RESULTS.add(c.element());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java
new file mode 100644
index 0000000..df3532d
--- /dev/null
+++ b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.runners.tez.TezRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the ParDoTranslator class
+ */
+public class ParDoTranslatorTest {
+
+ private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE";
+ private static final String TEST_TAG = "TestName";
+ private static TransformHierarchy hierarchy;
+ private static PValue pvalue;
+ private static DAG dag;
+ private static TranslationContext context;
+ private static ParDoTranslator translator;
+
+ @Test
+ public void testParDoTranslation() throws Exception {
+ MultiOutput parDo = ParDo.of(new TestDoFn()).withOutputTags(new TupleTag<>(), TupleTagList.of(new TupleTag<String>()));
+ Node node = hierarchy.pushNode(TEST_TAG, pvalue, parDo);
+ hierarchy.setOutput(pvalue);
+ context.setCurrentTransform(node);
+ translator.translate(parDo, context);
+ context.populateDAG(dag);
+ Vertex vertex = Iterables.getOnlyElement(dag.getVertices());
+ Configuration config = TezUtils.createConfFromUserPayload(vertex.getProcessorDescriptor().getUserPayload());
+ String doFnString = config.get(DO_FN_INSTANCE_TAG);
+ DoFn doFn = (DoFn) TranslatorUtil.fromString(doFnString);
+
+ Assert.assertEquals(vertex.getProcessorDescriptor().getClassName(), TezDoFnProcessor.class.getName());
+ Assert.assertEquals(doFn.getClass(), TestDoFn.class);
+ }
+
+ @Before
+ public void setupTest(){
+ dag = DAG.create(TEST_TAG);
+ translator = new ParDoTranslator();
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(TezRunner.class);
+ TezPipelineOptions tezOptions = PipelineOptionsValidator.validate(TezPipelineOptions.class, options);
+ context = new TranslationContext(tezOptions, new TezConfiguration());
+ hierarchy = new TransformHierarchy(Pipeline.create());
+ PValue innerValue = new PValueBase() {
+ @Override
+ public String getName() {return null;}
+ };
+ pvalue = new PValue() {
+ @Override
+ public String getName() {return null;}
+
+ @Override
+ public Map<TupleTag<?>, PValue> expand() {
+ Map<TupleTag<?>, PValue> map = new HashMap<>();
+ map.put(new TupleTag<>(), innerValue);
+ return map;
+ }
+
+ @Override
+ public void finishSpecifying(PInput upstreamInput, PTransform<?, ?> upstreamTransform) {}
+
+ @Override
+ public Pipeline getPipeline() {return null;}
+
+ @Override
+ public void finishSpecifyingOutput(String transformName, PInput input, PTransform<?, ?> transform) {}
+ };
+ }
+
+ private static class TestDoFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ //Test DoFn
+ }
+ }
+}