You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:34:41 UTC

[2/3] beam git commit: [BEAM-2709] Add TezRunner

[BEAM-2709] Add TezRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8994f07e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8994f07e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8994f07e

Branch: refs/heads/tez-runner
Commit: 8994f07e052e1958909d59896bcc4eff39019f3a
Parents: f10399d
Author: Scheller <sc...@186590cf3d5d.ant.amazon.com>
Authored: Tue Jun 20 14:03:44 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Fri Nov 17 12:33:57 2017 -0800

----------------------------------------------------------------------
 runners/pom.xml                                 |   1 +
 runners/tez/pom.xml                             | 135 +++++++++++++++
 .../beam/runners/tez/TezPipelineOptions.java    |  27 +++
 .../org/apache/beam/runners/tez/TezRunner.java  |  79 +++++++++
 .../beam/runners/tez/TezRunnerResult.java       |  78 +++++++++
 .../FlattenPCollectionTranslator.java           |  34 ++++
 .../tez/translation/GroupByKeyTranslator.java   |  43 +++++
 .../tez/translation/ParDoTranslator.java        |  92 ++++++++++
 .../tez/translation/ReadBoundedTranslator.java  |  43 +++++
 .../tez/translation/TezDoFnProcessor.java       | 131 +++++++++++++++
 .../tez/translation/TezPipelineTranslator.java  | 139 ++++++++++++++++
 .../tez/translation/TransformTranslator.java    |  28 ++++
 .../tez/translation/TranslationContext.java     | 166 +++++++++++++++++++
 .../runners/tez/translation/TranslatorUtil.java | 147 ++++++++++++++++
 .../ViewCreatePCollectionViewTranslator.java    |  35 ++++
 .../tez/translation/WindowAssignTranslator.java |  35 ++++
 .../tez/translation/WriteFilesTranslator.java   |  45 +++++
 .../tez/translation/io/MROutputManager.java     |  67 ++++++++
 .../tez/translation/io/NoOpOutputManager.java   |  35 ++++
 .../io/OrderedPartitionedKVOutputManager.java   |  62 +++++++
 .../translation/io/OutputManagerFactory.java    |  39 +++++
 .../tez/translation/io/TezOutputManager.java    |  62 +++++++
 .../io/UnorderedKVEdgeOutputManager.java        |  57 +++++++
 .../apache/beam/runners/tez/TezRunnerTest.java  | 155 +++++++++++++++++
 .../tez/translation/ParDoTranslatorTest.java    | 120 ++++++++++++++
 .../tez/translation/TezDoFnProcessorTest.java   | 112 +++++++++++++
 .../tez/translation/TranslationContextTest.java | 110 ++++++++++++
 .../tez/translation/TranslatorUtilTest.java     |  43 +++++
 runners/tez/src/test/resources/test_input.txt   |   2 +
 29 files changed, 2122 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 47f3c0e..2892fe8 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -65,6 +65,7 @@
       </activation>
       <modules>
         <module>gearpump</module>
+        <module>tez</module>
       </modules>
     </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/pom.xml
----------------------------------------------------------------------
diff --git a/runners/tez/pom.xml b/runners/tez/pom.xml
new file mode 100644
index 0000000..b7d0d6d
--- /dev/null
+++ b/runners/tez/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+    </plugins>
+
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+      </resource>
+    </resources>
+
+  </build>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>2.0.0</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-tez</artifactId>
+  <version>2.0.0</version>
+
+  <name>Apache Beam :: Runners :: Tez</name>
+
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <!-- Tez -->
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+      <version>0.8.4</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>2.7.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>2.7.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>0.8.4</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce</artifactId>
+      <version>0.8.4</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-library</artifactId>
+      <version>0.8.4</version>
+    </dependency>
+
+    <!-- Beam -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-java</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-harness</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java
new file mode 100644
index 0000000..8b37b09
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Options that configure the Tez pipeline.
+ */
+public interface TezPipelineOptions extends PipelineOptions, java.io.Serializable {
+  //TODO: Add options to configure Tez
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java
new file mode 100644
index 0000000..7d32b47
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.tez.translation.TezPipelineTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PipelineRunner} that translates the
+ * pipeline to an Tez DAG and executes it on a Tez cluster.
+ *
+ */
+public class TezRunner extends PipelineRunner<TezRunnerResult>{
+
+  private static final Logger LOG = LoggerFactory.getLogger(TezClient.class);
+
+  private final TezPipelineOptions options;
+
+  private TezRunner(TezPipelineOptions options){
+    this.options = options;
+  }
+
+  public static TezRunner fromOptions(PipelineOptions options) {
+    TezPipelineOptions tezOptions = PipelineOptionsValidator.validate(TezPipelineOptions.class,options);
+    return new TezRunner(tezOptions);
+  }
+
+  @Override
+  public TezRunnerResult run(Pipeline pipeline) {
+    //Setup Tez Local Config
+    TezConfiguration config = new TezConfiguration();
+    config.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    config.set("fs.default.name", "file:///");
+    config.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+    config.set(TezConfiguration.TEZ_TASK_LOG_LEVEL, "DEBUG");
+    //TODO: Support Remote Tez Configuration
+
+    final TezPipelineTranslator translator = new TezPipelineTranslator(options, config);
+    final AtomicReference<DAG> tezDAG = new AtomicReference<>();
+    DAG dag = DAG.create(options.getJobName());
+    tezDAG.set(dag);
+    translator.translate(pipeline, dag);
+
+    TezClient client = TezClient.create("TezRun", config);
+    try {
+      client.start();
+      client.submitDAG(dag);
+    } catch (Exception e){
+      e.printStackTrace();
+    }
+
+    return new TezRunnerResult(client);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java
new file mode 100644
index 0000000..870c43c
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.tez.client.TezAppMasterStatus;
+import org.apache.tez.client.TezClient;
+import org.joda.time.Duration;
+
+/**
+ * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Tez.
+ */
+public class TezRunnerResult implements PipelineResult {
+
+  private final TezClient client;
+  private State state = State.UNKNOWN;
+
+  public TezRunnerResult(TezClient client){
+    this.client = client;
+  }
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public State waitUntilFinish() {
+    return waitUntilFinish(null);
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
+        : System.currentTimeMillis() + duration.getMillis();
+    try {
+      while (client.getAppMasterStatus() != TezAppMasterStatus.SHUTDOWN && System.currentTimeMillis() < timeout) {
+        Thread.sleep(500);
+      }
+      if (!client.getAppMasterStatus().equals(TezAppMasterStatus.SHUTDOWN)){
+        return null;
+      }
+      return State.DONE;
+    } catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public State cancel() throws IOException {
+    //TODO: CODE TO CANCEL PIPELINE
+    return state;
+  }
+
+  @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java
new file mode 100644
index 0000000..f1f5aee
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.Flatten;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Flatten} translation to Tez equivalent.
+ */
+class FlattenPCollectionTranslator<T> implements TransformTranslator<Flatten.PCollections<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslator.class);
+
+  @Override
+  public void translate(Flatten.PCollections<T> transform, TranslationContext context) {
+    //TODO: Translate transform to Tez and add to TranslationContext
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..8f95752
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link GroupByKey} translation to Tez {@link org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig}
+ */
+class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
+  private static final Logger LOG = LoggerFactory.getLogger(GroupByKey.class);
+
+  @Override
+  public void translate(GroupByKey<K, V> transform, TranslationContext context) {
+    if (context.getCurrentInputs().size() > 1 ){
+      throw new RuntimeException("Multiple Inputs are not yet supported");
+    } else if (context.getCurrentOutputs().size() > 1){
+      throw new RuntimeException("Multiple Outputs are not yet supported");
+    }
+    PValue input = Iterables.getOnlyElement(context.getCurrentInputs().values());
+    PValue output = Iterables.getOnlyElement(context.getCurrentOutputs().values());
+    context.addShufflePair(input, output);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java
new file mode 100644
index 0000000..9ce11e6
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import org.apache.beam.sdk.transforms.DoFn;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.beam.sdk.transforms.ParDo} translation to Tez {@link Vertex}.
+ */
+class ParDoTranslator<InputT, OutputT> implements TransformTranslator<MultiOutput<InputT, OutputT>> {
+  private static final Logger LOG = LoggerFactory.getLogger(ParDoTranslator.class);
+  private static final String OUTPUT_TAG = "OUTPUT_TAG";
+  private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE";
+
+  @Override
+  public void translate(MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+    //Prepare input/output targets
+    if (context.getCurrentInputs().size() > 1){
+      throw new NotImplementedException("Multiple Inputs are not yet supported");
+    } else if (context.getCurrentOutputs().size() > 1){
+      throw new NotImplementedException("Multiple Outputs are not yet supported");
+    }
+    PValue input = Iterables.getOnlyElement(context.getCurrentInputs().values());
+    PValue output = Iterables.getOnlyElement(context.getCurrentOutputs().values());
+
+    //Prepare UserPayload Configuration
+    DoFn doFn = transform.getFn();
+    String doFnInstance;
+    try {
+      doFnInstance = TranslatorUtil.toString(doFn);
+    } catch ( IOException e){
+      throw new RuntimeException("DoFn failed to serialize: " + e.getMessage());
+    }
+    Configuration config = new Configuration();
+    config.set(OUTPUT_TAG, transform.getMainOutputTag().getId());
+    config.set(DO_FN_INSTANCE_TAG, doFnInstance);
+
+    //Check for shuffle input
+    boolean shuffle = false;
+    for (Pair<PValue, PValue> pair : context.getShuffleSet()){
+      if (pair.getRight().equals(input)){
+        shuffle = true;
+      }
+    }
+
+    //Create Vertex with Payload
+    try {
+      UserPayload payload = TezUtils.createUserPayloadFromConf(config);
+      Vertex vertex;
+      if (shuffle) {
+        vertex = Vertex.create(context.getCurrentName(), ProcessorDescriptor.create(TezDoFnProcessor.class.getName()).setUserPayload(payload), 1);
+        //TODO: add customizable parallelism
+      } else {
+        vertex = Vertex.create(context.getCurrentName(), ProcessorDescriptor.create(TezDoFnProcessor.class.getName()).setUserPayload(payload));
+      }
+      context.addVertex(context.getCurrentName(), vertex, input, output);
+    } catch (Exception e){
+      throw new RuntimeException("Vertex Translation Failure from: " + e.getMessage());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java
new file mode 100644
index 0000000..3192a81
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Bounded} translation to Tez {@link DataSourceDescriptor}.
+ */
+class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(TransformTranslator.class);
+
+  @Override
+  public void translate(Bounded<T> transform, TranslationContext context) {
+    //Build datasource and add to datasource map
+    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(context.getConfig()),
+        TextInputFormat.class, transform.getSource().toString()).build();
+    //TODO: Support Configurable Input Formats
+    context.getCurrentOutputs().forEach( (a, b) -> context.addSource(b, dataSource));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java
new file mode 100644
index 0000000..0fde90c
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import java.util.LinkedList;
+import org.apache.beam.fn.harness.fake.FakeStepContext;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.tez.translation.io.MROutputManager;
+import org.apache.beam.runners.tez.translation.io.NoOpOutputManager;
+import org.apache.beam.runners.tez.translation.io.OrderedPartitionedKVOutputManager;
+import org.apache.beam.runners.tez.translation.io.OutputManagerFactory;
+import org.apache.beam.runners.tez.translation.io.TezOutputManager;
+import org.apache.beam.runners.tez.translation.io.UnorderedKVEdgeOutputManager;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+/**
+ * TezDoFnProcessor is the Tez Wrapper to wrap user defined functions for Tez processing
+ * The DoFn is received through the {@link UserPayload} and then run using the simple {@link DoFnRunner}
+ */
+public class TezDoFnProcessor extends SimpleProcessor {
+
+  private static final String OUTPUT_TAG = "OUTPUT_TAG";
+  private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE";
+
+  private DoFn<?,?> theDoFn;
+  private String outputTag;
+
+  public TezDoFnProcessor(ProcessorContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize() throws Exception {
+    Configuration config = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+    outputTag = config.get(OUTPUT_TAG, null);
+    String doFnInstance = config.get(DO_FN_INSTANCE_TAG, null);
+    theDoFn = (DoFn) TranslatorUtil.fromString(doFnInstance);
+    super.initialize();
+  }
+
+  @Override
+  public void run() throws Exception {
+    //Setup Reader
+    KeyValueReader kvReader = null;
+    KeyValuesReader kvsReader = null;
+    LogicalInput input = Iterables.getOnlyElement(getInputs().values());
+    Reader reader = input.getReader();
+    if (reader instanceof KeyValueReader) {
+      kvReader = (KeyValueReader) reader;
+    } else if (reader instanceof KeyValuesReader) {
+      kvsReader = (KeyValuesReader) reader;
+    } else {
+      throw new RuntimeException("UNSUPPORTED READER!");
+    }
+
+    //Setup Writer
+    TezOutputManager outputManager;
+    if (getOutputs().size() == 1){
+      LogicalOutput output = Iterables.getOnlyElement(getOutputs().values());
+      outputManager = OutputManagerFactory.createOutputManager(output);
+      outputManager.before();
+    } else if (getOutputs().size() == 0){
+      outputManager = new NoOpOutputManager();
+    } else {
+      throw new RuntimeException("Multiple outputs not yet supported");
+    }
+
+    //Initialize DoFnRunner
+    DoFnRunner runner = DoFnRunners.simpleRunner(PipelineOptionsFactory.create(), theDoFn, NullSideInputReader
+        .empty(), outputManager, new TupleTag<>(outputTag), new LinkedList<>(),
+        new FakeStepContext(), WindowingStrategy.globalDefault());
+    runner.startBundle();
+
+    //Start Runner
+    if (kvsReader != null){
+      while (kvsReader.next()){
+        outputManager.setCurrentElement(WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvsReader.getCurrentKey())));
+        runner.processElement(WindowedValue.valueInGlobalWindow(KV.of(TranslatorUtil.convertToJavaType(kvsReader.getCurrentKey()),
+            TranslatorUtil.convertIteratorToJavaType(kvsReader.getCurrentValues()))));
+      }
+    } else if (kvReader != null){
+      while (kvReader.next()){
+        WindowedValue value = WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvReader.getCurrentKey()));
+        outputManager.setCurrentElement(value);
+        runner.processElement(WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvReader.getCurrentValue())));
+      }
+    } else {
+      throw new RuntimeException("UNSUPPORTED READER!");
+    }
+
+    outputManager.after();
+    runner.finishBundle();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java
new file mode 100644
index 0000000..7b4646a
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TezPipelineTranslator} translates {@link Pipeline} objects
+ * into Tez logical plan {@link DAG}.
+ */
+public class TezPipelineTranslator implements Pipeline.PipelineVisitor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TezPipelineTranslator.class);
+
+  /**
+   * A map from {@link PTransform} subclass to the corresponding
+   * {@link TransformTranslator} to use to translate that transform.
+   */
+  private static final Map<Class<? extends PTransform>, TransformTranslator>
+      transformTranslators = new HashMap<>();
+
+  private static final Map<Class<? extends PTransform>, TransformTranslator>
+      compositeTransformTranslators = new HashMap<>();
+
+  private final TranslationContext translationContext;
+
+  static {
+    registerTransformTranslator(ParDo.MultiOutput.class, new ParDoTranslator<>());
+    registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator<>());
+    registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator<>());
+    registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator<>());
+    registerTransformTranslator(Flatten.PCollections.class, new FlattenPCollectionTranslator<>());
+    registerTransformTranslator(View.CreatePCollectionView.class, new ViewCreatePCollectionViewTranslator<>());
+    registerCompositeTransformTranslator(WriteFiles.class, new WriteFilesTranslator());
+  }
+
+  public TezPipelineTranslator(TezPipelineOptions options, TezConfiguration config){
+    translationContext = new TranslationContext(options, config);
+  }
+
+  public void translate(Pipeline pipeline, DAG dag) {
+    pipeline.traverseTopologically(this);
+    translationContext.populateDAG(dag);
+  }
+
+  /**
+   * Main visitor method called on each {@link PTransform} to transform them to Tez objects.
+   * @param node Pipeline node containing {@link PTransform} to be translated.
+   */
+  @Override
+  public void visitPrimitiveTransform(Node node) {
+    LOG.debug("visiting transform {}", node.getTransform());
+    PTransform transform = node.getTransform();
+    TransformTranslator translator = transformTranslators.get(transform.getClass());
+    if (translator == null) {
+      throw new UnsupportedOperationException(
+          "no translator registered for " + transform);
+    }
+    translationContext.setCurrentTransform(node);
+    translator.translate(transform, translationContext);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(Node node) {
+    LOG.debug("entering composite transform {}", node.getTransform());
+    PTransform transform = node.getTransform();
+    if (transform != null){
+      TransformTranslator translator = compositeTransformTranslators.get(transform.getClass());
+      if (translator != null) {
+        translationContext.setCurrentTransform(node);
+        translator.translate(transform, translationContext);
+        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+      }
+    }
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(Node node) {
+    LOG.debug("leaving composite transform {}", node.getTransform());
+  }
+
+  @Override
+  public void visitValue(PValue value, Node producer) {
+    LOG.debug("visiting value {}", value);
+  }
+
+  /**
+   * Records that instances of the specified PTransform class
+   * should be translated by default by the corresponding
+   * {@link TransformTranslator}.
+   */
+  private static <TransformT extends PTransform> void registerTransformTranslator(
+      Class<TransformT> transformClass, TransformTranslator<? extends TransformT> transformTranslator) {
+    if (transformTranslators.put(transformClass, transformTranslator) != null) {
+      throw new IllegalArgumentException("defining multiple translators for " + transformClass);
+    }
+  }
+
+  private static <TransformT extends PTransform> void registerCompositeTransformTranslator(
+      Class<TransformT> transformClass, TransformTranslator<? extends TransformT> transformTranslator) {
+    if (compositeTransformTranslators.put(transformClass, transformTranslator) != null) {
+      throw new IllegalArgumentException("defining multiple translators for " + transformClass);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java
new file mode 100644
index 0000000..736c840
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Translates {@link PTransform} to Tez functions.
+ */
+interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable {
+  void translate(T transform, TranslationContext context);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java
new file mode 100644
index 0000000..1bffe95
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+
+/**
+ * Maintains context data for {@link TransformTranslator}s.
+ * Tracks and maintains each individual {@link Vertex} and their {@link Edge} connections.
+ */
+public class TranslationContext {
+
+  private final TezPipelineOptions pipelineOptions;
+  private final TezConfiguration config;
+
+  private AppliedPTransform<?, ?, ?> currentTransform;
+  private String currentName;
+  private Map<TupleTag<?>, PValue> currentInputs;
+  private Map<TupleTag<?>, PValue> currentOutputs;
+
+  private Map<String, Vertex> vertexMap = new HashMap<>();
+  private Map<PValue, Vertex> vertexInputMap = new HashMap<>();
+  private Map<PValue, Vertex> vertexOutputMap = new HashMap<>();
+
+  private Set<Pair<PValue, PValue>> shuffleSet = new HashSet<>();
+
+  private Map<PValue, DataSourceDescriptor> sourceMap = new HashMap<>();
+  private Map<PValue, DataSinkDescriptor> sinkMap = new HashMap<>();
+
+  public TranslationContext(TezPipelineOptions options, TezConfiguration config){
+    this.pipelineOptions = options;
+    this.config = config;
+  }
+
+  public void setCurrentTransform(TransformHierarchy.Node treeNode) {
+    this.currentTransform = treeNode.toAppliedPTransform();
+    this.currentInputs = treeNode.getInputs();
+    this.currentOutputs = treeNode.getOutputs();
+    this.currentName = treeNode.getFullName();
+  }
+
+  public void addVertex(String name, Vertex vertex, PValue input, PValue output) {
+    vertexMap.put(name, vertex);
+    vertexInputMap.put(input, vertex);
+    vertexOutputMap.put(output, vertex);
+  }
+
+  public void addShufflePair(PValue input, PValue output) {
+    shuffleSet.add(Pair.of(input, output));
+  }
+
+  public Set<Pair<PValue, PValue>> getShuffleSet(){
+    return this.shuffleSet;
+  }
+
+  public void addSource(PValue output, DataSourceDescriptor dataSource) {
+    sourceMap.put(output, dataSource);
+  }
+
+  public void addSink(PValue input, DataSinkDescriptor dataSink) {
+    sinkMap.put(input, dataSink);
+  }
+
+  public TezConfiguration getConfig() {
+    return config;
+  }
+
+  public AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    return currentTransform;
+  }
+
+  public String getCurrentName() {
+    return currentName;
+  }
+
+  public Map<TupleTag<?>, PValue> getCurrentInputs() {
+    return currentInputs;
+  }
+
+  public Map<TupleTag<?>, PValue> getCurrentOutputs() {
+    return currentOutputs;
+  }
+
+  /**
+   * Populates the given Tez dag with the context's {@link Vertex} and {@link Edge}.
+   * @param dag Empty Tez dag to be populated.
+   */
+  public void populateDAG(DAG dag){
+
+    for (Vertex v : vertexMap.values()){
+      dag.addVertex(v);
+    }
+
+    //Add Sources
+    sourceMap.forEach( (value, source) -> {
+      Vertex vertex = vertexInputMap.get(value);
+      if (vertex != null){
+        vertex.addDataSource(value.getName(), source);
+      }
+    });
+
+    //Add Sinks
+    sinkMap.forEach( (value, source) -> {
+      Vertex vertex = vertexOutputMap.get(value);
+      if (vertex != null){
+        vertex.addDataSink(value.getName(), source);
+      }
+    });
+
+    //Add Shuffle Edges
+    for (Pair<PValue, PValue> pair : shuffleSet){
+      Vertex inputVertex = vertexOutputMap.get(pair.getLeft());
+      Vertex outputVertex = vertexInputMap.get(pair.getRight());
+      OrderedPartitionedKVEdgeConfig edgeConfig = OrderedPartitionedKVEdgeConfig.newBuilder(
+          BytesWritable.class.getName(), BytesWritable.class.getName(), HashPartitioner.class.getName()).build();
+      dag.addEdge(Edge.create(inputVertex, outputVertex, edgeConfig.createDefaultEdgeProperty()));
+    }
+
+    //Add Edges
+    vertexInputMap.forEach( (PValue inputValue, Vertex inputVertex) -> {
+      vertexOutputMap.forEach( (outputValue, outputVertex) -> {
+        if (inputValue.equals(outputValue)){
+          UnorderedKVEdgeConfig edgeConfig = UnorderedKVEdgeConfig.newBuilder(BytesWritable.class.getName(),
+              BytesWritable.class.getName()).build();
+          dag.addEdge(Edge.create(outputVertex, inputVertex, edgeConfig.createDefaultOneToOneEdgeProperty()));
+        }
+      });
+    });
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java
new file mode 100644
index 0000000..32b3ad0
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import org.apache.beam.sdk.transforms.DoFn;
+import java.util.List;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Translator Utilities to convert between hadoop and java types.
+ */
+public class TranslatorUtil {
+
+  /**
+   * Utility to convert java objects to bytes and place them in BytesWritable wrapper for hadoop use.
+   * @param element java object to be converted
+   * @return BytesWritable wrapped object
+   */
+  public static Object convertToBytesWritable(Object element) {
+    byte[] bytes;
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutput out = new ObjectOutputStream(bos)) {
+      out.writeObject(element);
+      out.flush();
+      bytes = bos.toByteArray();
+    } catch (Exception e){
+      throw new RuntimeException("Failed to serialize object into byte array: " + e.getMessage());
+    }
+    if (bytes != null) {
+      return new BytesWritable(bytes);
+    } else {
+      throw new RuntimeException("Cannot convert null element to BytesWritable!");
+    }
+  }
+
+  /**
+   * Utility to convert hadoop objects back to their java equivalent.
+   * @param element hadoop object to be converted
+   * @return original java object
+   */
+  public static Object convertToJavaType(Object element) {
+    Object returnValue;
+    if (element instanceof BytesWritable){
+      BytesWritable myElement = (BytesWritable) element;
+      byte[] data = myElement.getBytes();
+      try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
+          ObjectInput in = new ObjectInputStream(bis)) {
+        returnValue = in.readObject();
+      } catch (Exception e){
+        throw new RuntimeException("Failed to deserialize object from byte array: " + e.getMessage());
+      }
+    } else if (element instanceof Text) {
+      returnValue = element.toString();
+    } else if (element instanceof BooleanWritable) {
+      returnValue = ((BooleanWritable) element).get();
+    } else if (element instanceof IntWritable){
+      returnValue = ((IntWritable) element).get();
+    } else if (element instanceof DoubleWritable){
+      returnValue = ((DoubleWritable) element).get();
+    } else if (element instanceof FloatWritable){
+      returnValue = ((FloatWritable) element).get();
+    } else if (element instanceof LongWritable){
+      returnValue = ((LongWritable) element).get();
+    } else if (element instanceof ShortWritable){
+      returnValue = ((ShortWritable) element).get();
+    } else if (element instanceof ObjectWritable){
+      returnValue = ((ObjectWritable) element).get();
+    } else {
+      throw new RuntimeException("Hadoop Type " + element.getClass() + " cannot be converted to Java!");
+    }
+    return returnValue;
+  }
+
+  /**
+   * Utility to convert hadoop objects within an iterable back to their java equivalent.
+   * @param iterable Iterable containing objects to be converted
+   * @return new Iterable with original java objects
+   */
+  static Iterable<Object> convertIteratorToJavaType(Iterable<Object> iterable){
+    List<Object> list = new ArrayList<>();
+    iterable.iterator().forEachRemaining((Object element) -> list.add(convertToJavaType(element)));
+    return list;
+  }
+
+  /**
+   * Utility to serialize a serializable object into a string.
+   * @param object that is serializable to be serialized.
+   * @return serialized string
+   * @throws IOException thrown for serialization errors.
+   */
+  public static String toString( Serializable object ) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(object);
+    oos.close();
+    return Base64.getEncoder().encodeToString(baos.toByteArray());
+  }
+
+  /**
+   * Utility to deserialize a string into a serializable object.
+   * @param string containing serialized object.
+   * @return Original object
+   * @throws IOException thrown for serialization errors.
+   * @throws ClassNotFoundException thrown for serialization errors.
+   */
+  public static Object fromString( String string ) throws IOException, ClassNotFoundException {
+    byte [] data = Base64.getDecoder().decode(string);
+    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
+    Object object = ois.readObject();
+    ois.close();
+    return object;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java
new file mode 100644
index 0000000..3fbb296
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.View;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link View.CreatePCollectionView} translation to Tez equivalent.
+ */
+class ViewCreatePCollectionViewTranslator<ElemT, ViewT> implements
+    TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> {
+  private static final Logger LOG = LoggerFactory.getLogger(ViewCreatePCollectionViewTranslator.class);
+
+  @Override
+  public void translate(View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
+    //TODO: Translate transform to Tez and add to TranslationContext
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java
new file mode 100644
index 0000000..433e5a5
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Assign;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Assign} translation to Tez equivalent.
+ */
+class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(WindowAssignTranslator.class);
+
+  @Override
+  public void translate(Assign<T> transform, TranslationContext context) {
+    //TODO: Translate transform to Tez and add to TranslationContext
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java
new file mode 100644
index 0000000..312d8ae
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.mapreduce.output.MROutput;
+
+/**
+ * {@link MROutput} translation to Tez {@link DataSinkDescriptor}.
+ */
+class WriteFilesTranslator implements TransformTranslator<WriteFiles<?>> {
+
+  @Override
+  public void translate(WriteFiles transform, TranslationContext context) {
+    Pattern pattern = Pattern.compile(".*\\{.*\\{value=(.*)}}.*");
+    Matcher matcher = pattern.matcher(transform.getSink().getBaseOutputDirectoryProvider().toString());
+    if (matcher.matches()){
+      String output = matcher.group(1);
+      DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(context.getConfig()),
+          TextOutputFormat.class, output).build();
+
+      context.getCurrentInputs().forEach( (a, b) -> context.addSink(b, dataSink));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java
new file mode 100644
index 0000000..8305e1e
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import java.io.IOException;
+import org.apache.beam.runners.tez.translation.TranslatorUtil;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * {@link TezOutputManager} implementation that properly writes output to {@link MROutput}
+ */
+public class MROutputManager extends TezOutputManager {
+
+  private MROutput output;
+
+  public MROutputManager(LogicalOutput output) {
+    super(output);
+    if (output.getClass().equals(MROutput.class)){
+      this.output = (MROutput) output;
+      try {
+        setWriter((KeyValueWriter) output.getWriter());
+      } catch (Exception e) {
+        throw new RuntimeException("Error when retrieving writer for output" + e.getMessage());
+      }
+    } else {
+      throw new RuntimeException("Incorrect OutputManager for: " + output.getClass());
+    }
+  }
+
+  @Override
+  public void after() {
+    try {
+      output.flush();
+      output.commit();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+    try {
+      getWriter().write(null, TranslatorUtil.convertToBytesWritable(output.getValue()));
+    } catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java
new file mode 100644
index 0000000..725be0a
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * {@link TezOutputManager} implementation for when the {@link org.apache.tez.dag.api.Vertex} has no output.
+ * Used in cases such as when the ParDo within the Vertex writes the output itself.
+ */
+public class NoOpOutputManager extends TezOutputManager {
+
+  public NoOpOutputManager() {
+    super(null);
+  }
+
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java
new file mode 100644
index 0000000..4a652da
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.runners.tez.translation.TranslatorUtil;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+
+/**
+ * {@link TezOutputManager} implementation that properly writes output to {@link OrderedPartitionedKVOutput}
+ */
+public class OrderedPartitionedKVOutputManager extends TezOutputManager {
+
+  private OrderedPartitionedKVOutput output;
+
+  public OrderedPartitionedKVOutputManager(LogicalOutput output) {
+    super(output);
+    if (output.getClass().equals(OrderedPartitionedKVOutput.class)){
+      this.output = (OrderedPartitionedKVOutput) output;
+      try {
+        setWriter((KeyValueWriter) output.getWriter());
+      } catch (Exception e) {
+        throw new RuntimeException("Error when retrieving writer for output" + e.getMessage());
+      }
+    } else {
+      throw new RuntimeException("Incorrect OutputManager for: " + output.getClass());
+    }
+  }
+
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+    try {
+      if (output.getValue() instanceof KV) {
+        getWriter().write(TranslatorUtil.convertToBytesWritable(((KV) output.getValue()).getKey()),
+            TranslatorUtil.convertToBytesWritable(((KV) output.getValue()).getValue()));
+      } else {
+        throw new IllegalArgumentException("GroupByKey can only group Key-Value outputs!");
+      }
+    } catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java
new file mode 100644
index 0000000..807de3f
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+
+public class OutputManagerFactory {
+  public static TezOutputManager createOutputManager(LogicalOutput output){
+    TezOutputManager outputManager;
+    if (output.getClass().equals(OrderedPartitionedKVOutput.class)){
+      outputManager = new OrderedPartitionedKVOutputManager(output);
+    } else if (output.getClass().equals(UnorderedKVOutput.class)){
+      outputManager = new UnorderedKVEdgeOutputManager(output);
+    } else if (output.getClass().equals(MROutput.class)){
+      outputManager = new MROutputManager(output);
+    } else {
+      throw new RuntimeException("Output type: " + output.getClass() + " is unsupported");
+    }
+    return outputManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java
new file mode 100644
index 0000000..2999ee5
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * Abstract Output Manager that adds before and after methods to the {@link DoFnRunners.OutputManager}
+ * interface so that outputs that require them can be added and used with the TezRunner.
+ */
+public abstract class TezOutputManager implements DoFnRunners.OutputManager {
+
+  private WindowedValue currentElement;
+  private KeyValueWriter writer;
+  private LogicalOutput output;
+
+  public TezOutputManager(LogicalOutput output){
+    this.output = output;
+  }
+
+  public void before() {}
+
+  public void after() {}
+
+  public void setCurrentElement(WindowedValue currentElement) {
+    this.currentElement = currentElement;
+  }
+
+  public WindowedValue getCurrentElement(){
+    return currentElement;
+  }
+
+  public void setWriter(KeyValueWriter writer) {
+    this.writer = writer;
+  }
+
+  public KeyValueWriter getWriter() {
+    return writer;
+  }
+
+  public LogicalOutput getOutput() {
+    return output;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java
new file mode 100644
index 0000000..34cb371
--- /dev/null
+++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation.io;
+
+import org.apache.beam.runners.tez.translation.TranslatorUtil;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+
+/**
+ * {@link TezOutputManager} implementation that properly writes output to {@link UnorderedKVOutput}
+ */
+public class UnorderedKVEdgeOutputManager extends TezOutputManager {
+
+  private UnorderedKVOutput output;
+
+  public UnorderedKVEdgeOutputManager(LogicalOutput output) {
+    super(output);
+    if (output.getClass().equals(UnorderedKVOutput.class)){
+      this.output = (UnorderedKVOutput) output;
+      try {
+        setWriter((KeyValueWriter) output.getWriter());
+      } catch (Exception e) {
+        throw new RuntimeException("Error when retrieving writer for output" + e.getMessage());
+      }
+    } else {
+      throw new RuntimeException("Incorrect OutputManager for: " + output.getClass());
+    }
+  }
+
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+    try {
+      getWriter().write(TranslatorUtil.convertToBytesWritable(getCurrentElement().getValue()),
+          TranslatorUtil.convertToBytesWritable(output.getValue()));
+    } catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java b/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java
new file mode 100644
index 0000000..66e9f19
--- /dev/null
+++ b/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the Tez runner.
+ */
+public class TezRunnerTest {
+
+  private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
+  private static final String INPUT_LOCATION = "src/test/resources/test_input.txt";
+
+  private static Pipeline tezPipeline;
+  private static Pipeline directPipeline;
+
+  @Before
+  public void setupPipelines(){
+    //TezRunner Pipeline
+    PipelineOptions tezOptions = PipelineOptionsFactory.create();
+    tezOptions.setRunner(TezRunner.class);
+    tezPipeline = Pipeline.create(tezOptions);
+
+    //DirectRunner Pipeline
+    PipelineOptions options = PipelineOptionsFactory.create();
+    directPipeline = Pipeline.create(options);
+  }
+
+  @Test
+  public void simpleTest() throws Exception {
+    tezPipeline.apply(TextIO.read().from(INPUT_LOCATION))
+        .apply(ParDo.of(new AddHelloWorld()))
+        .apply(ParDo.of(new TestTezFn()));
+
+    directPipeline.apply(TextIO.read().from(INPUT_LOCATION))
+        .apply(ParDo.of(new AddHelloWorld()))
+        .apply(ParDo.of(new TestDirectFn()));
+
+    tezPipeline.run().waitUntilFinish();
+    directPipeline.run().waitUntilFinish();
+    Assert.assertEquals(TestDirectFn.RESULTS, TestTezFn.RESULTS);
+  }
+
+  @Test
+  public void wordCountTest() throws Exception {
+    tezPipeline.apply("ONE", TextIO.read().from(INPUT_LOCATION))
+        .apply("TWO", ParDo.of(new TokenDoFn()))
+        .apply("THREE", GroupByKey.create())
+        .apply("FOUR", ParDo.of(new ProcessDoFn()))
+        .apply("FIVE", ParDo.of(new TestTezFn()));
+
+    directPipeline.apply("ONE", TextIO.read().from(INPUT_LOCATION))
+        .apply("TWO", ParDo.of(new TokenDoFn()))
+        .apply("THREE", GroupByKey.create())
+        .apply("FOUR", ParDo.of(new ProcessDoFn()))
+        .apply("FIVE", ParDo.of(new TestDirectFn()));
+
+    tezPipeline.run().waitUntilFinish();
+    directPipeline.run().waitUntilFinish();
+    Assert.assertEquals(TestDirectFn.RESULTS, TestTezFn.RESULTS);
+  }
+
+  private static class AddHelloWorld extends DoFn<String, String>{
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+
+      // Split the line into words.
+      String[] words = c.element().split(TOKENIZER_PATTERN);
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word + " HelloWorld");
+        }
+      }
+    }
+  }
+
+  public static class TokenDoFn extends DoFn<String, KV<String, Integer>>{
+    @ProcessElement
+    public void processElement(ProcessContext c){
+      for( String word : c.element().split(TOKENIZER_PATTERN)){
+        if(!word.isEmpty()){
+          c.output(KV.of(word, 1));
+        }
+      }
+    }
+  }
+
+  public static class ProcessDoFn extends DoFn<KV<String,Iterable<Integer>>, String>{
+    @ProcessElement
+    public void processElement(ProcessContext c){
+      Integer sum = 0;
+      for( Integer integer : c.element().getValue()){
+        sum = sum + integer;
+      }
+      c.output(c.element().getKey() + ": " + sum);
+    }
+  }
+
+  private static class TestTezFn extends DoFn<String, String> {
+    private static final Set<String> RESULTS = Collections.synchronizedSet(new HashSet<>());
+
+    public TestTezFn(){
+      RESULTS.clear();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      RESULTS.add(c.element());
+    }
+  }
+
+  private static class TestDirectFn extends DoFn<String, String> {
+    private static final Set<String> RESULTS = Collections.synchronizedSet(new HashSet<>());
+
+    public TestDirectFn(){
+      RESULTS.clear();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      RESULTS.add(c.element());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java
new file mode 100644
index 0000000..df3532d
--- /dev/null
+++ b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.runners.tez.TezRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the ParDoTranslator class
+ */
+public class ParDoTranslatorTest {
+
+  private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE";
+  private static final String TEST_TAG = "TestName";
+  private static TransformHierarchy hierarchy;
+  private static PValue pvalue;
+  private static DAG dag;
+  private static TranslationContext context;
+  private static ParDoTranslator translator;
+
+  @Test
+  public void testParDoTranslation() throws Exception {
+    MultiOutput parDo = ParDo.of(new TestDoFn()).withOutputTags(new TupleTag<>(), TupleTagList.of(new TupleTag<String>()));
+    Node node = hierarchy.pushNode(TEST_TAG, pvalue, parDo);
+    hierarchy.setOutput(pvalue);
+    context.setCurrentTransform(node);
+    translator.translate(parDo, context);
+    context.populateDAG(dag);
+    Vertex vertex = Iterables.getOnlyElement(dag.getVertices());
+    Configuration config = TezUtils.createConfFromUserPayload(vertex.getProcessorDescriptor().getUserPayload());
+    String doFnString = config.get(DO_FN_INSTANCE_TAG);
+    DoFn doFn = (DoFn) TranslatorUtil.fromString(doFnString);
+
+    Assert.assertEquals(vertex.getProcessorDescriptor().getClassName(), TezDoFnProcessor.class.getName());
+    Assert.assertEquals(doFn.getClass(), TestDoFn.class);
+  }
+
+  @Before
+  public void setupTest(){
+    dag = DAG.create(TEST_TAG);
+    translator = new ParDoTranslator();
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(TezRunner.class);
+    TezPipelineOptions tezOptions = PipelineOptionsValidator.validate(TezPipelineOptions.class, options);
+    context = new TranslationContext(tezOptions, new TezConfiguration());
+    hierarchy = new TransformHierarchy(Pipeline.create());
+    PValue innerValue = new PValueBase() {
+      @Override
+      public String getName() {return null;}
+    };
+    pvalue = new PValue() {
+      @Override
+      public String getName() {return null;}
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        Map<TupleTag<?>, PValue> map = new HashMap<>();
+        map.put(new TupleTag<>(), innerValue);
+        return map;
+      }
+
+      @Override
+      public void finishSpecifying(PInput upstreamInput, PTransform<?, ?> upstreamTransform) {}
+
+      @Override
+      public Pipeline getPipeline() {return null;}
+
+      @Override
+      public void finishSpecifyingOutput(String transformName, PInput input, PTransform<?, ?> transform) {}
+    };
+  }
+
+  private static class TestDoFn extends DoFn<String, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      //Test DoFn
+    }
+  }
+}