You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:18 UTC

[09/20] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
index e5bd108..12a3740 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
@@ -21,8 +21,8 @@ package org.apache.tez.mapreduce.hadoop;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.Constants;
 import org.junit.Test;
 
 public class TestDeprecatedKeys {
@@ -43,22 +43,22 @@ public class TestDeprecatedKeys {
     MultiStageMRConfToTezTranslator.translateVertexConfToTez(jobConf, null);
 
     assertEquals(0.4f, jobConf.getFloat(
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT, 0f), 0.01f);
-    assertEquals(20000l, jobConf.getLong(Constants.TEZ_ENGINE_TASK_MEMORY, 0));
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0f), 0.01f);
+    assertEquals(20000l, jobConf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY, 0));
     assertEquals(2000,
-        jobConf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 0));
+        jobConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
     assertEquals(0.55f, jobConf.getFloat(
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT, 0), 0.01f);
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0), 0.01f);
     assertEquals(0.60f,
-        jobConf.getFloat(TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS, 0),
+        jobConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 0),
         0.01f);
     assertEquals(0.22f,
-        jobConf.getFloat(TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT, 0),
+        jobConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0),
         0.01f);
     assertEquals(true, jobConf.getBoolean(
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM, false));
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, false));
     assertEquals(0.33f,
-        jobConf.getFloat(TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT, 0),
+        jobConf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 0),
         0.01f);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 4b2c0e8..9590e72 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -48,15 +48,15 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.engine.api.impl.InputSpec;
-import org.apache.tez.engine.api.impl.OutputSpec;
-import org.apache.tez.engine.api.impl.TaskSpec;
-import org.apache.tez.engine.api.impl.TezUmbilical;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
 
 public class MapUtils {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 89292ab..5b8eedf 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -31,19 +31,9 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.impl.InputSpec;
-import org.apache.tez.engine.api.impl.OutputSpec;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
-import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.TestUmbilical;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
@@ -51,6 +41,16 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.output.LocalOnFileSorterOutput;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -85,10 +85,10 @@ public class TestMapProcessor {
     job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
     job.set(MRConfig.LOCAL_DIR, workDir.toString());
     job.setClass(
-        Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
         TezLocalTaskOutputFiles.class, 
         TezTaskOutput.class);
-    job.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
+    job.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
     job.setNumReduceTasks(1);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 274c353..d2c7952 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -33,21 +33,11 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.security.token.Token;
-import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.engine.api.impl.InputSpec;
-import org.apache.tez.engine.api.impl.OutputSpec;
-import org.apache.tez.engine.api.impl.TaskSpec;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.input.LocalMergedInput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
-import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.TestUmbilical;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
@@ -58,6 +48,16 @@ import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
+import org.apache.tez.runtime.library.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.input.LocalMergedInput;
+import org.apache.tez.runtime.library.output.LocalOnFileSorterOutput;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -90,10 +90,10 @@ public class TestReduceProcessor {
     job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
     job.set(MRConfig.LOCAL_DIR, workDir.toString());
     job.setClass(
-        Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
         TezLocalTaskOutputFiles.class, 
         TezTaskOutput.class);
-    job.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
+    job.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
     job.setNumReduceTasks(1);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/findbugs-exclude.xml b/tez-runtime-internals/findbugs-exclude.xml
new file mode 100644
index 0000000..5b11308
--- /dev/null
+++ b/tez-runtime-internals/findbugs-exclude.xml
@@ -0,0 +1,16 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml
new file mode 100644
index 0000000..4f64701
--- /dev/null
+++ b/tez-runtime-internals/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-runtime-internals</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-library</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
+    <dependency>
+     <groupId>com.google.protobuf</groupId>
+     <artifactId>protobuf-java</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>Events.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerContext.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerContext.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerContext.java
new file mode 100644
index 0000000..df92bdc
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerContext.java
@@ -0,0 +1,64 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+// TODO EVENTUALLY move this over to PB. Fix package/module.
+// TODO EVENTUALLY unit tests for functionality.
+public class ContainerContext implements Writable {
+
+  String containerIdentifier;
+  String pid;
+
+  public ContainerContext() {
+    containerIdentifier = "";
+    pid = "";
+  }
+
+  public ContainerContext(String containerIdStr, String pid) {
+    this.containerIdentifier = containerIdStr;
+    this.pid = pid;
+  }
+
+  public String getContainerIdentifier() {
+    return containerIdentifier;
+  }
+
+  public String getPid() {
+    return pid;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.containerIdentifier = Text.readString(in);
+    this.pid = Text.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, containerIdentifier);
+    Text.writeString(out, pid);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
new file mode 100644
index 0000000..c865631
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+public class ContainerTask implements Writable {
+
+  TaskSpec taskSpec;
+  boolean shouldDie;
+
+  public ContainerTask() {
+  }
+
+  public ContainerTask(TaskSpec taskSpec, boolean shouldDie) {
+    this.taskSpec = taskSpec;
+    this.shouldDie = shouldDie;
+  }
+
+  public TaskSpec getTaskSpec() {
+    return taskSpec;
+  }
+
+  public boolean shouldDie() {
+    return shouldDie;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(shouldDie);
+    if (taskSpec != null) {
+      out.writeBoolean(true);
+      taskSpec.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    shouldDie = in.readBoolean();
+    boolean taskComing = in.readBoolean();
+    if (taskComing) {
+      taskSpec = new TaskSpec();
+      taskSpec.readFields(in);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "shouldDie: " + shouldDie + ", TaskSpec: "
+        + taskSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
new file mode 100644
index 0000000..1966790
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.tez.common.records.ProceedToCompletionResponse;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+
+/** Protocol that task child process uses to contact its parent process.  The
+ * parent is a daemon which which polls the central master for a new map or
+ * reduce task and runs it as a child process.  All communication between child
+ * and parent is via this protocol. */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+@ProtocolInfo(protocolName = "TezTaskUmbilicalProtocol", protocolVersion = 1)
+public interface TezTaskUmbilicalProtocol extends VersionedProtocol {
+
+  public static final long versionID = 19L;
+
+  ContainerTask getTask(ContainerContext containerContext) throws IOException;
+
+  boolean canCommit(TezTaskAttemptID taskid) throws IOException;
+
+  ProceedToCompletionResponse
+      proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException;
+
+  /// Copies from TezUmbilical until complete re-factor is done
+  // TODONEWTEZ
+
+  public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
+      throws IOException, TezException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
new file mode 100644
index 0000000..8aff6d1
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Output;
+import org.apache.tez.runtime.api.Processor;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezInputContextImpl;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+@Private
+public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
+
+  private static final Log LOG = LogFactory
+      .getLog(LogicalIOProcessorRuntimeTask.class);
+
+  private final List<InputSpec> inputSpecs;
+  private final List<LogicalInput> inputs;
+
+  private final List<OutputSpec> outputSpecs;
+  private final List<LogicalOutput> outputs;
+
+  private List<TezInputContext> inputContexts;
+  private List<TezOutputContext> outputContexts;
+  private TezProcessorContext processorContext;
+  
+  private final ProcessorDescriptor processorDescriptor;
+  private final LogicalIOProcessor processor;
+
+  private final Map<String, ByteBuffer> serviceConsumerMetadata;
+
+  private Map<String, LogicalInput> inputMap;
+  private Map<String, LogicalOutput> outputMap;
+
+  private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
+  private Thread eventRouterThread = null;
+
+  private final int appAttemptNumber;
+
+  public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
+      Configuration tezConf, TezUmbilical tezUmbilical,
+      Token<JobTokenIdentifier> jobToken) throws IOException {
+    // TODO Remove jobToken from here post TEZ-421
+    super(taskSpec, tezConf, tezUmbilical);
+    LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
+        + taskSpec);
+    this.inputContexts = new ArrayList<TezInputContext>(taskSpec.getInputs().size());
+    this.outputContexts = new ArrayList<TezOutputContext>(taskSpec.getOutputs().size());
+    this.inputSpecs = taskSpec.getInputs();
+    this.inputs = createInputs(inputSpecs);
+    this.outputSpecs = taskSpec.getOutputs();
+    this.outputs = createOutputs(outputSpecs);
+    this.processorDescriptor = taskSpec.getProcessorDescriptor();
+    this.processor = createProcessor(processorDescriptor);
+    this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+    this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+        ShuffleUtils.convertJobTokenToBytes(jobToken));
+    this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
+    this.state = State.NEW;
+    this.appAttemptNumber = appAttemptNumber;
+  }
+
+  public void initialize() throws Exception {
+    LOG.info("Initializing LogicalProcessorIORuntimeTask");
+    Preconditions.checkState(this.state == State.NEW, "Already initialized");
+    this.state = State.INITED;
+    inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
+    outputMap = new LinkedHashMap<String, LogicalOutput>(outputs.size());
+
+    // TODO Maybe close initialized inputs / outputs in case of failure to
+    // initialize.
+    // Initialize all inputs. TODO: Multi-threaded at some point.
+    for (int i = 0; i < inputs.size(); i++) {
+      String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+      initializeInput(inputs.get(i),
+          inputSpecs.get(i));
+      inputMap.put(srcVertexName, inputs.get(i));
+    }
+
+    // Initialize all outputs. TODO: Multi-threaded at some point.
+    for (int i = 0; i < outputs.size(); i++) {
+      String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+      initializeOutput(outputs.get(i), outputSpecs.get(i));
+      outputMap.put(destVertexName, outputs.get(i));
+    }
+
+    // Initialize processor.
+    initializeLogicalIOProcessor();
+    startRouterThread();
+  }
+
+  public void run() throws Exception {
+    synchronized (this.state) {
+      Preconditions.checkState(this.state == State.INITED,
+          "Can only run while in INITED state. Current: " + this.state);
+      this.state = State.RUNNING;
+    }
+    LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor;
+    lioProcessor.run(inputMap, outputMap);
+  }
+
+  public void close() throws Exception {
+    try {
+      Preconditions.checkState(this.state == State.RUNNING,
+          "Can only run while in RUNNING state. Current: " + this.state);
+      this.state = State.CLOSED;
+
+      // Close the Inputs.
+      for (int i = 0; i < inputs.size(); i++) {
+        String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+        List<Event> closeInputEvents = inputs.get(i).close();
+        sendTaskGeneratedEvents(closeInputEvents,
+            EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
+            srcVertexName, taskSpec.getTaskAttemptID());
+      }
+
+      // Close the Processor.
+      processor.close();
+
+      // Close the Outputs.
+      for (int i = 0; i < outputs.size(); i++) {
+        String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+        List<Event> closeOutputEvents = outputs.get(i).close();
+        sendTaskGeneratedEvents(closeOutputEvents,
+            EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
+            destVertexName, taskSpec.getTaskAttemptID());
+      }
+    } finally {
+      setTaskDone();
+      if (eventRouterThread != null) {
+        eventRouterThread.interrupt();
+      }
+    }
+  }
+
+  private void initializeInput(Input input, InputSpec inputSpec)
+      throws Exception {
+    TezInputContext tezInputContext = createInputContext(inputSpec);
+    inputContexts.add(tezInputContext);
+    if (input instanceof LogicalInput) {
+      ((LogicalInput) input).setNumPhysicalInputs(inputSpec
+          .getPhysicalEdgeCount());
+    }
+    LOG.info("Initializing Input using InputSpec: " + inputSpec);
+    List<Event> events = input.initialize(tezInputContext);
+    sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
+        tezInputContext.getTaskVertexName(),
+        tezInputContext.getSourceVertexName(), taskSpec.getTaskAttemptID());
+  }
+
+  private void initializeOutput(Output output, OutputSpec outputSpec)
+      throws Exception {
+    TezOutputContext tezOutputContext = createOutputContext(outputSpec);
+    outputContexts.add(tezOutputContext);
+    if (output instanceof LogicalOutput) {
+      ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
+          .getPhysicalEdgeCount());
+    }
+    LOG.info("Initializing Output using OutputSpec: " + outputSpec);
+    List<Event> events = output.initialize(tezOutputContext);
+    sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
+        tezOutputContext.getTaskVertexName(),
+        tezOutputContext.getDestinationVertexName(),
+        taskSpec.getTaskAttemptID());
+  }
+
+  private void initializeLogicalIOProcessor() throws Exception {
+    LOG.info("Initializing processor"
+        + ", processorClassName=" + processorDescriptor.getClassName());
+    TezProcessorContext processorContext = createProcessorContext();
+    this.processorContext = processorContext;
+    processor.initialize(processorContext);
+  }
+
+  private TezInputContext createInputContext(InputSpec inputSpec) {
+    TezInputContext inputContext = new TezInputContextImpl(tezConf,
+        appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
+        inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(),
+        tezCounters,
+        inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
+            .getProcessorDescriptor().getUserPayload() : inputSpec
+            .getInputDescriptor().getUserPayload(), this,
+        serviceConsumerMetadata);
+    return inputContext;
+  }
+
+  private TezOutputContext createOutputContext(OutputSpec outputSpec) {
+    TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
+        appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
+        outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID(),
+        tezCounters,
+        outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
+            .getProcessorDescriptor().getUserPayload() : outputSpec
+            .getOutputDescriptor().getUserPayload(), this,
+        serviceConsumerMetadata);
+    return outputContext;
+  }
+
+  private TezProcessorContext createProcessorContext() {
+    TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
+        appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
+        tezCounters, processorDescriptor.getUserPayload(), this,
+        serviceConsumerMetadata);
+    return processorContext;
+  }
+
+  private List<LogicalInput> createInputs(List<InputSpec> inputSpecs) {
+    List<LogicalInput> inputs = new ArrayList<LogicalInput>(inputSpecs.size());
+    for (InputSpec inputSpec : inputSpecs) {
+      LOG.info("Creating Input from InputSpec: "
+          + inputSpec);
+      Input input = RuntimeUtils.createClazzInstance(inputSpec
+          .getInputDescriptor().getClassName());
+
+      if (input instanceof LogicalInput) {
+        inputs.add((LogicalInput) input);
+      } else {
+        throw new TezUncheckedException(input.getClass().getName()
+            + " is not a sub-type of LogicalInput."
+            + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
+      }
+    }
+    return inputs;
+  }
+
+  private List<LogicalOutput> createOutputs(List<OutputSpec> outputSpecs) {
+    List<LogicalOutput> outputs = new ArrayList<LogicalOutput>(
+        outputSpecs.size());
+    for (OutputSpec outputSpec : outputSpecs) {
+      LOG.info("Creating Output from OutputSpec"
+          + outputSpec);
+      Output output = RuntimeUtils.createClazzInstance(outputSpec
+          .getOutputDescriptor().getClassName());
+      if (output instanceof LogicalOutput) {
+        outputs.add((LogicalOutput) output);
+      } else {
+        throw new TezUncheckedException(output.getClass().getName()
+            + " is not a sub-type of LogicalOutput."
+            + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
+      }
+    }
+    return outputs;
+  }
+
+  private LogicalIOProcessor createProcessor(
+      ProcessorDescriptor processorDescriptor) {
+    Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
+        .getClassName());
+    if (!(processor instanceof LogicalIOProcessor)) {
+      throw new TezUncheckedException(processor.getClass().getName()
+          + " is not a sub-type of LogicalIOProcessor."
+          + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
+    }
+    return (LogicalIOProcessor) processor;
+  }
+
+  private void sendTaskGeneratedEvents(List<Event> events,
+      EventProducerConsumerType generator, String taskVertexName,
+      String edgeVertexName, TezTaskAttemptID taskAttemptID) {
+    if (events == null || events.isEmpty()) {
+      return;
+    }
+    EventMetaData eventMetaData = new EventMetaData(generator,
+        taskVertexName, edgeVertexName, taskAttemptID);
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent te = new TezEvent(e, eventMetaData);
+      tezEvents.add(te);
+    }
+    if (LOG.isDebugEnabled()) {
+      for (TezEvent e : tezEvents) {
+        LOG.debug("Generated event info"
+            + ", eventMetaData=" + eventMetaData.toString()
+            + ", eventType=" + e.getEventType());
+      }
+    }
+    tezUmbilical.addEvents(tezEvents);
+  }
+
+  private boolean handleEvent(TezEvent e) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling TezEvent in task"
+          + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
+          + ", eventType=" + e.getEventType()
+          + ", eventSourceInfo=" + e.getSourceInfo()
+          + ", eventDestinationInfo=" + e.getDestinationInfo());
+    }
+    try {
+      switch (e.getDestinationInfo().getEventGenerator()) {
+      case INPUT:
+        LogicalInput input = inputMap.get(
+            e.getDestinationInfo().getEdgeVertexName());
+        if (input != null) {
+          input.handleEvents(Collections.singletonList(e.getEvent()));
+        } else {
+          throw new TezUncheckedException("Unhandled event for invalid target: "
+              + e);
+        }
+        break;
+      case OUTPUT:
+        LogicalOutput output = outputMap.get(
+            e.getDestinationInfo().getEdgeVertexName());
+        if (output != null) {
+          output.handleEvents(Collections.singletonList(e.getEvent()));
+        } else {
+          throw new TezUncheckedException("Unhandled event for invalid target: "
+              + e);
+        }
+        break;
+      case PROCESSOR:
+        processor.handleEvents(Collections.singletonList(e.getEvent()));
+        break;
+      case SYSTEM:
+        LOG.warn("Trying to send a System event in a Task: " + e);
+        break;
+      }
+    } catch (Throwable t) {
+      LOG.warn("Failed to handle event", t);
+      setFatalError(t, "Failed to handle event");
+      EventMetaData sourceInfo = new EventMetaData(
+          e.getDestinationInfo().getEventGenerator(),
+          taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(),
+          getTaskAttemptID());
+      tezUmbilical.signalFatalError(getTaskAttemptID(),
+          StringUtils.stringifyException(t), sourceInfo);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public synchronized void handleEvents(Collection<TezEvent> events) {
+    if (events == null || events.isEmpty()) {
+      return;
+    }
+    eventCounter.addAndGet(events.size());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received events to be processed by task"
+          + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
+          + ", eventCount=" + events.size()
+          + ", newEventCounter=" + eventCounter.get());
+    }
+    eventsToBeProcessed.addAll(events);
+  }
+
+  private void startRouterThread() {
+    eventRouterThread = new Thread(new Runnable() {
+      public void run() {
+        while (!isTaskDone() && !Thread.currentThread().isInterrupted()) {
+          try {
+            TezEvent e = eventsToBeProcessed.take();
+            if (e == null) {
+              continue;
+            }
+            // TODO TODONEWTEZ
+            if (!handleEvent(e)) {
+              LOG.warn("Stopping Event Router thread as failed to handle"
+                  + " event: " + e);
+              return;
+            }
+          } catch (InterruptedException e) {
+            if (!isTaskDone()) {
+              LOG.warn("Event Router thread interrupted. Returning.");
+            }
+            return;
+          }
+        }
+      }
+    });
+
+    eventRouterThread.setName("TezTaskEventRouter["
+        + taskSpec.getTaskAttemptID().toString() + "]");
+    eventRouterThread.start();
+  }
+
+  public synchronized void cleanup() {
+    setTaskDone();
+    if (eventRouterThread != null) {
+      eventRouterThread.interrupt();
+    }
+  }
+  
+  @Private
+  @VisibleForTesting
+  public List<TezInputContext> getInputContexts() {
+    return this.inputContexts;
+  }
+  
+  @Private
+  @VisibleForTesting
+  public List<TezOutputContext> getOutputContexts() {
+    return this.outputContexts;
+  }
+
+  @Private
+  @VisibleForTesting
+  public TezProcessorContext getProcessorContext() {
+    return this.processorContext;
+  }
+  
+  @Private
+  @VisibleForTesting
+  public Map<String, LogicalInput> getInputs() {
+    return this.inputMap;
+  }
+  
+  @Private
+  @VisibleForTesting
+  public Map<String, LogicalOutput> getOutputs() {
+    return this.outputMap;
+  }
+  
+  @Private
+  @VisibleForTesting
+  public LogicalIOProcessor getProcessor() {
+    return this.processor;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
new file mode 100644
index 0000000..f018333
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+
+public abstract class RuntimeTask {
+
+  protected AtomicBoolean hasFatalError = new AtomicBoolean(false);
+  protected Throwable fatalError = null;
+  protected String fatalErrorMessage = null;
+  protected float progress;
+  protected final TezCounters tezCounters;
+  protected final TaskSpec taskSpec;
+  protected final Configuration tezConf;
+  protected final TezUmbilical tezUmbilical;
+  protected final AtomicInteger eventCounter;
+  private final AtomicBoolean taskDone;
+
+  protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
+      TezUmbilical tezUmbilical) {
+    this.taskSpec = taskSpec;
+    this.tezConf = tezConf;
+    this.tezUmbilical = tezUmbilical;
+    this.tezCounters = new TezCounters();
+    this.eventCounter = new AtomicInteger(0);
+    this.progress = 0.0f;
+    this.taskDone = new AtomicBoolean(false);
+  }
+
+  protected enum State {
+    NEW, INITED, RUNNING, CLOSED;
+  }
+
+  protected State state;
+
+  public String getVertexName() {
+    return taskSpec.getVertexName();
+  }
+
+  public void setFatalError(Throwable t, String message) {
+    hasFatalError.set(true);
+    this.fatalError = t;
+    this.fatalErrorMessage = message;
+  }
+
+  public boolean hadFatalError() {
+    return hasFatalError.get();
+  }
+
+  public synchronized void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  public synchronized float getProgress() {
+    return this.progress;
+  }
+
+  public TezCounters getCounters() {
+    return this.tezCounters;
+  }
+
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskSpec.getTaskAttemptID();
+  }
+
+  public abstract void handleEvents(Collection<TezEvent> events);
+
+  public int getEventCounter() {
+    return eventCounter.get();
+  }
+
+  public boolean isTaskDone() {
+    return taskDone.get();
+  }
+
+  protected void setTaskDone() {
+    taskDone.set(true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
new file mode 100644
index 0000000..8a1b550
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class RuntimeUtils {
+
+  private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
+
+  private static Class<?> getClazz(String className) {
+    Class<?> clazz = CLAZZ_CACHE.get(className);
+    if (clazz == null) {
+      try {
+        clazz = Class.forName(className);
+      } catch (ClassNotFoundException e) {
+        throw new TezUncheckedException("Unable to load class: " + className, e);
+      }
+    }
+    return clazz;
+  }
+
+  private static <T> T getNewInstance(Class<T> clazz) {
+    T instance;
+    try {
+      instance = clazz.newInstance();
+    } catch (InstantiationException e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+    } catch (IllegalAccessException e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+    }
+    return instance;
+  }
+
+  public static <T> T createClazzInstance(String className) {
+    Class<?> clazz = getClazz(className);
+    @SuppressWarnings("unchecked")
+    T instance = (T) getNewInstance(clazz);
+    return instance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptCompletedEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptCompletedEvent.java
new file mode 100644
index 0000000..597718f
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptCompletedEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.tez.runtime.api.Event;
+
+public class TaskAttemptCompletedEvent extends Event {
+
+  public TaskAttemptCompletedEvent() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java
new file mode 100644
index 0000000..935fdbb
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskAttemptFailedEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.tez.runtime.api.Event;
+
+public class TaskAttemptFailedEvent extends Event {
+
+  private final String diagnostics;
+
+  public TaskAttemptFailedEvent(String diagnostics) {
+    this.diagnostics = diagnostics;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
new file mode 100644
index 0000000..47c2998
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.Event;
+
+public class TaskStatusUpdateEvent extends Event implements Writable {
+
+  private TezCounters tezCounters;
+  private float progress;
+
+  public TaskStatusUpdateEvent() {
+  }
+
+  public TaskStatusUpdateEvent(TezCounters tezCounters, float progress) {
+    this.tezCounters = tezCounters;
+    this.progress = progress;
+  }
+
+  public TezCounters getCounters() {
+    return tezCounters;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeFloat(progress);
+    if (tezCounters != null) {
+      out.writeBoolean(true);
+      tezCounters.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    progress = in.readFloat();
+    if (in.readBoolean()) {
+      tezCounters = new TezCounters();
+      tezCounters.readFields(in);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
new file mode 100644
index 0000000..d650fa3
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * Class that encapsulates all the information to identify the unique
+ * object that either generated an Event or is the recipient of an Event.
+ */
+public class EventMetaData implements Writable {
+
+  public static enum EventProducerConsumerType {
+    INPUT,
+    PROCESSOR,
+    OUTPUT,
+    SYSTEM
+  }
+
+  /**
+   * Producer Type ( one of Input/Output/Processor ) that generated the Event
+   * or Consumer Type that will consume the Event.
+   */
+  private EventProducerConsumerType producerConsumerType;
+
+  /**
+   * Name of the vertex where the event was generated.
+   */
+  private String taskVertexName;
+
+  /**
+   * Name of the vertex to which the Input or Output is connected to.
+   */
+  private String edgeVertexName;
+
+  /**
+   * i'th physical input/output that this event maps to.
+   */
+  private int index;
+
+  /**
+   * Task Attempt ID
+   */
+  private TezTaskAttemptID taskAttemptID;
+
+  public EventMetaData() {
+  }
+
+  public EventMetaData(EventProducerConsumerType generator,
+      String taskVertexName, String edgeVertexName,
+      TezTaskAttemptID taskAttemptID) {
+    this.producerConsumerType = generator;
+    this.taskVertexName = taskVertexName;
+    this.edgeVertexName = edgeVertexName;
+    this.taskAttemptID = taskAttemptID;
+  }
+
+  public EventProducerConsumerType getEventGenerator() {
+    return producerConsumerType;
+  }
+
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskAttemptID;
+  }
+
+  public String getTaskVertexName() {
+    return taskVertexName;
+  }
+
+  public String getEdgeVertexName() {
+    return edgeVertexName;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(producerConsumerType.ordinal());
+    if (taskVertexName != null) {
+      out.writeBoolean(true);
+      out.writeUTF(taskVertexName);
+    } else {
+      out.writeBoolean(false);
+    }
+    if (edgeVertexName != null) {
+      out.writeBoolean(true);
+      out.writeUTF(edgeVertexName);
+    } else {
+      out.writeBoolean(false);
+    }
+    if(taskAttemptID != null) {
+      out.writeBoolean(true);
+      taskAttemptID.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+    
+    out.writeInt(index);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    producerConsumerType = EventProducerConsumerType.values()[in.readInt()];
+    if (in.readBoolean()) {
+      taskVertexName = in.readUTF();
+    }
+    if (in.readBoolean()) {
+      edgeVertexName = in.readUTF();
+    }
+    if (in.readBoolean()) {
+      taskAttemptID = new TezTaskAttemptID();
+      taskAttemptID.readFields(in);
+    }
+    index = in.readInt();
+  }
+
+  public int getIndex() {
+    return index;
+  }
+
+  public void setIndex(int index) {
+    this.index = index;
+  }
+
+  @Override
+  public String toString() {
+    return "{ producerConsumerType=" + producerConsumerType
+        + ", taskVertexName=" + taskVertexName
+        + ", edgeVertexName=" + edgeVertexName
+        + ", taskAttemptId=" + taskAttemptID
+        + ", index=" + index + " }";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
new file mode 100644
index 0000000..81ff5fc
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+public enum EventType {
+  TASK_ATTEMPT_COMPLETED_EVENT,
+  TASK_ATTEMPT_FAILED_EVENT,
+  DATA_MOVEMENT_EVENT,
+  INPUT_READ_ERROR_EVENT,
+  INPUT_FAILED_EVENT,
+  INTPUT_INFORMATION_EVENT,
+  TASK_STATUS_UPDATE_EVENT
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
new file mode 100644
index 0000000..78ed886
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+public class InputSpec implements Writable {
+
+  private String sourceVertexName;
+  private InputDescriptor inputDescriptor;
+  private int physicalEdgeCount;
+
+  public InputSpec() {
+  }
+
+  public InputSpec(String sourceVertexName, InputDescriptor inputDescriptor,
+      int physicalEdgeCount) {
+    this.sourceVertexName = sourceVertexName;
+    this.inputDescriptor = inputDescriptor;
+    this.physicalEdgeCount = physicalEdgeCount;
+  }
+
+  public String getSourceVertexName() {
+    return sourceVertexName;
+  }
+
+  public InputDescriptor getInputDescriptor() {
+    return inputDescriptor;
+  }
+
+  public int getPhysicalEdgeCount() {
+    return physicalEdgeCount;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // TODONEWTEZ convert to PB
+    out.writeUTF(sourceVertexName);
+    out.writeInt(physicalEdgeCount);
+    byte[] inputDescBytes =
+        DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
+    out.writeInt(inputDescBytes.length);
+    out.write(inputDescBytes);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    sourceVertexName = in.readUTF();
+    physicalEdgeCount = in.readInt();
+    int inputDescLen = in.readInt();
+    byte[] inputDescBytes = new byte[inputDescLen];
+    in.readFully(inputDescBytes);
+    inputDescriptor =
+        DagTypeConverters.convertInputDescriptorFromDAGPlan(
+            TezEntityDescriptorProto.parseFrom(inputDescBytes));
+  }
+
+  public String toString() {
+    return "{ sourceVertexName=" + sourceVertexName
+        + ", physicalEdgeCount" + physicalEdgeCount
+        + ", inputClassName=" + inputDescriptor.getClassName()
+        + " }";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
new file mode 100644
index 0000000..4034cdd
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/OutputSpec.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+public class OutputSpec implements Writable {
+
+  private String destinationVertexName;
+  private OutputDescriptor outputDescriptor;
+  private int physicalEdgeCount;
+
+  public OutputSpec() {
+  }
+
+  public OutputSpec(String destinationVertexName,
+      OutputDescriptor outputDescriptor, int physicalEdgeCount) {
+    this.destinationVertexName = destinationVertexName;
+    this.outputDescriptor = outputDescriptor;
+    this.physicalEdgeCount = physicalEdgeCount;
+  }
+
+  public String getDestinationVertexName() {
+    return destinationVertexName;
+  }
+
+  public OutputDescriptor getOutputDescriptor() {
+    return outputDescriptor;
+  }
+
+  public int getPhysicalEdgeCount() {
+    return physicalEdgeCount;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // TODONEWTEZ convert to PB
+    out.writeUTF(destinationVertexName);
+    out.writeInt(physicalEdgeCount);
+    byte[] inputDescBytes =
+        DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
+    out.writeInt(inputDescBytes.length);
+    out.write(inputDescBytes);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    destinationVertexName = in.readUTF();
+    physicalEdgeCount = in.readInt();
+    int inputDescLen = in.readInt();
+    byte[] inputDescBytes = new byte[inputDescLen];
+    in.readFully(inputDescBytes);
+    outputDescriptor =
+        DagTypeConverters.convertOutputDescriptorFromDAGPlan(
+            TezEntityDescriptorProto.parseFrom(inputDescBytes));
+  }
+
+  public String toString() {
+    return "{ destinationVertexName=" + destinationVertexName
+        + ", physicalEdgeCount" + physicalEdgeCount
+        + ", outputClassName=" + outputDescriptor.getClassName()
+        + " }";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
new file mode 100644
index 0000000..6e0995a
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskSpec implements Writable {
+
+  private TezTaskAttemptID taskAttemptId;
+  private String vertexName;
+  private String user;
+  private ProcessorDescriptor processorDescriptor;
+  private List<InputSpec> inputSpecList;
+  private List<OutputSpec> outputSpecList;
+
+  public TaskSpec() {
+  }
+
+  // TODO NEWTEZ Remove user
+  public TaskSpec(TezTaskAttemptID taskAttemptID, String user,
+      String vertexName, ProcessorDescriptor processorDescriptor,
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
+    this.taskAttemptId = taskAttemptID;
+    this.vertexName = vertexName;
+    this.user = user;
+    this.processorDescriptor = processorDescriptor;
+    this.inputSpecList = inputSpecList;
+    this.outputSpecList = outputSpecList;
+  }
+
+  public String getVertexName() {
+    return vertexName;
+  }
+
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskAttemptId;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public ProcessorDescriptor getProcessorDescriptor() {
+    return processorDescriptor;
+  }
+
+  public List<InputSpec> getInputs() {
+    return inputSpecList;
+  }
+
+  public List<OutputSpec> getOutputs() {
+    return outputSpecList;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskAttemptId.write(out);
+    out.writeUTF(vertexName);
+    byte[] procDesc =
+        DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
+    out.writeInt(procDesc.length);
+    out.write(procDesc);
+    out.writeInt(inputSpecList.size());
+    for (InputSpec inputSpec : inputSpecList) {
+      inputSpec.write(out);
+    }
+    out.writeInt(outputSpecList.size());
+    for (OutputSpec outputSpec : outputSpecList) {
+      outputSpec.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskAttemptId = new TezTaskAttemptID();
+    taskAttemptId.readFields(in);
+    vertexName = in.readUTF();
+    int procDescLength = in.readInt();
+    // TODO at least 3 buffer copies here. Need to convert this to full PB
+    // TEZ-305
+    byte[] procDescBytes = new byte[procDescLength];
+    in.readFully(procDescBytes);
+    processorDescriptor =
+        DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
+            TezEntityDescriptorProto.parseFrom(procDescBytes));
+    int numInputSpecs = in.readInt();
+    inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
+    for (int i = 0; i < numInputSpecs; i++) {
+      InputSpec inputSpec = new InputSpec();
+      inputSpec.readFields(in);
+      inputSpecList.add(inputSpec);
+    }
+    int numOutputSpecs = in.readInt();
+    outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
+    for (int i = 0; i < numOutputSpecs; i++) {
+      OutputSpec outputSpec = new OutputSpec();
+      outputSpec.readFields(in);
+      outputSpecList.add(outputSpec);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("TaskAttemptID:" + taskAttemptId);
+    sb.append("processorName=" + processorDescriptor.getClassName()
+        + ", inputSpecListSize=" + inputSpecList.size()
+        + ", outputSpecListSize=" + outputSpecList.size());
+    sb.append(", inputSpecList=[");
+    for (InputSpec i : inputSpecList) {
+      sb.append("{" + i.toString() + "}, ");
+    }
+    sb.append("], outputSpecList=[");
+    for (OutputSpec i : outputSpecList) {
+      sb.append("{" + i.toString() + "}, ");
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
new file mode 100644
index 0000000..e195cf9
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.EventProtos.DataMovementEventProto;
+import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
+import org.apache.tez.runtime.api.events.EventProtos.InputInformationEventProto;
+import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.events.InputInformationEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
+import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
+
+import com.google.protobuf.ByteString;
+
+public class TezEvent implements Writable {
+
+  private EventType eventType;
+
+  private Event event;
+
+  private EventMetaData sourceInfo;
+
+  private EventMetaData destinationInfo;
+
+  public TezEvent() {
+  }
+
+  public TezEvent(Event event, EventMetaData sourceInfo) {
+    this.event = event;
+    this.setSourceInfo(sourceInfo);
+    if (event instanceof DataMovementEvent) {
+      eventType = EventType.DATA_MOVEMENT_EVENT;
+    } else if (event instanceof InputReadErrorEvent) {
+      eventType = EventType.INPUT_READ_ERROR_EVENT;
+    } else if (event instanceof TaskAttemptFailedEvent) {
+      eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
+    } else if (event instanceof TaskAttemptCompletedEvent) {
+      eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
+    } else if (event instanceof InputInformationEvent) {
+      eventType = EventType.INTPUT_INFORMATION_EVENT;
+    } else if (event instanceof InputFailedEvent) {
+      eventType = EventType.INPUT_FAILED_EVENT;
+    } else if (event instanceof TaskStatusUpdateEvent) {
+      eventType = EventType.TASK_STATUS_UPDATE_EVENT;
+    } else {
+      throw new TezUncheckedException("Unknown event, event="
+          + event.getClass().getName());
+    }
+  }
+
+  public Event getEvent() {
+    return event;
+  }
+
+  public EventMetaData getSourceInfo() {
+    return sourceInfo;
+  }
+
+  public void setSourceInfo(EventMetaData sourceInfo) {
+    this.sourceInfo = sourceInfo;
+  }
+
+  public EventMetaData getDestinationInfo() {
+    return destinationInfo;
+  }
+
+  public void setDestinationInfo(EventMetaData destinationInfo) {
+    this.destinationInfo = destinationInfo;
+  }
+
+  public EventType getEventType() {
+    return eventType;
+  }
+
+  private void serializeEvent(DataOutput out) throws IOException {
+    if (event == null) {
+      out.writeBoolean(false);
+      return;
+    }
+    out.writeBoolean(true);
+    out.writeInt(eventType.ordinal());
+    if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
+      // TODO NEWTEZ convert to PB
+      TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
+      sEvt.write(out);
+    } else {
+      byte[] eventBytes = null;
+      switch (eventType) {
+      case DATA_MOVEMENT_EVENT:
+        DataMovementEvent dmEvt = (DataMovementEvent) event;
+        eventBytes = DataMovementEventProto.newBuilder()
+          .setSourceIndex(dmEvt.getSourceIndex())
+          .setTargetIndex(dmEvt.getTargetIndex())
+          .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
+          .build().toByteArray();
+        break;
+      case INPUT_READ_ERROR_EVENT:
+        InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
+        eventBytes = InputReadErrorEventProto.newBuilder()
+            .setIndex(ideEvt.getIndex())
+            .setDiagnostics(ideEvt.getDiagnostics())
+            .build().toByteArray();
+        break;
+      case TASK_ATTEMPT_FAILED_EVENT:
+        TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
+        eventBytes = TaskAttemptFailedEventProto.newBuilder()
+            .setDiagnostics(tfEvt.getDiagnostics())
+            .build().toByteArray();
+        break;
+      case TASK_ATTEMPT_COMPLETED_EVENT:
+        eventBytes = TaskAttemptCompletedEventProto.newBuilder()
+            .build().toByteArray();
+        break;
+      case INPUT_FAILED_EVENT:
+        InputFailedEvent ifEvt = (InputFailedEvent) event;
+        eventBytes = InputFailedEventProto.newBuilder()
+            .setSourceIndex(ifEvt.getSourceIndex())
+            .setTargetIndex(ifEvt.getTargetIndex())
+            .setVersion(ifEvt.getVersion()).build().toByteArray();
+      case INTPUT_INFORMATION_EVENT:
+        InputInformationEvent iEvt = (InputInformationEvent) event;
+        eventBytes = InputInformationEventProto.newBuilder()
+            .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
+            .build().toByteArray();
+      default:
+        throw new TezUncheckedException("Unknown TezEvent"
+           + ", type=" + eventType);
+      }
+      out.writeInt(eventBytes.length);
+      out.write(eventBytes);
+    }
+  }
+
+  private void deserializeEvent(DataInput in) throws IOException {
+    if (!in.readBoolean()) {
+      event = null;
+      return;
+    }
+    eventType = EventType.values()[in.readInt()];
+    if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
+      // TODO NEWTEZ convert to PB
+      event = new TaskStatusUpdateEvent();
+      ((TaskStatusUpdateEvent)event).readFields(in);
+    } else {
+      int eventBytesLen = in.readInt();
+      byte[] eventBytes = new byte[eventBytesLen];
+      in.readFully(eventBytes);
+      switch (eventType) {
+      case DATA_MOVEMENT_EVENT:
+        DataMovementEventProto dmProto =
+            DataMovementEventProto.parseFrom(eventBytes);
+        event = new DataMovementEvent(dmProto.getSourceIndex(),
+            dmProto.getTargetIndex(),
+            dmProto.getUserPayload().toByteArray());
+        break;
+      case INPUT_READ_ERROR_EVENT:
+        InputReadErrorEventProto ideProto =
+            InputReadErrorEventProto.parseFrom(eventBytes);
+        event = new InputReadErrorEvent(ideProto.getDiagnostics(),
+            ideProto.getIndex(), ideProto.getVersion());
+        break;
+      case TASK_ATTEMPT_FAILED_EVENT:
+        TaskAttemptFailedEventProto tfProto =
+            TaskAttemptFailedEventProto.parseFrom(eventBytes);
+        event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
+        break;
+      case TASK_ATTEMPT_COMPLETED_EVENT:
+        event = new TaskAttemptCompletedEvent();
+        break;
+      case INPUT_FAILED_EVENT:
+        InputFailedEventProto ifProto =
+            InputFailedEventProto.parseFrom(eventBytes);
+        event = new InputFailedEvent(ifProto.getSourceIndex(),
+            ifProto.getTargetIndex(), ifProto.getVersion());
+        break;
+      case INTPUT_INFORMATION_EVENT:
+        InputInformationEventProto infoProto =
+            InputInformationEventProto.parseFrom(eventBytes);
+        event = new InputInformationEvent(
+            infoProto.getUserPayload().toByteArray());
+        break;
+      default:
+        throw new TezUncheckedException("Unknown TezEvent"
+           + ", type=" + eventType);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    serializeEvent(out);
+    if (sourceInfo != null) {
+      out.writeBoolean(true);
+      sourceInfo.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+    if (destinationInfo != null) {
+      out.writeBoolean(true);
+      destinationInfo.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    deserializeEvent(in);
+    if (in.readBoolean()) {
+      sourceInfo = new EventMetaData();
+      sourceInfo.readFields(in);
+    }
+    if (in.readBoolean()) {
+      destinationInfo = new EventMetaData();
+      destinationInfo.readFields(in);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
new file mode 100644
index 0000000..af7cebb
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+
+public class TezHeartbeatRequest implements Writable {
+
+  private String containerIdentifier;
+  private List<TezEvent> events;
+  private TezTaskAttemptID currentTaskAttemptID;
+  private int startIndex;
+  private int maxEvents;
+  private long requestId;
+
+  public TezHeartbeatRequest() {
+  }
+
+  public TezHeartbeatRequest(long requestId, List<TezEvent> events,
+      String containerIdentifier, TezTaskAttemptID taskAttemptID,
+      int startIndex, int maxEvents) {
+    this.containerIdentifier = containerIdentifier;
+    this.requestId = requestId;
+    this.events = Collections.unmodifiableList(events);
+    this.startIndex = startIndex;
+    this.maxEvents = maxEvents;
+    this.currentTaskAttemptID = taskAttemptID;
+  }
+
+  public String getContainerIdentifier() {
+    return containerIdentifier;
+  }
+
+  public List<TezEvent> getEvents() {
+    return events;
+  }
+
+  public int getStartIndex() {
+    return startIndex;
+  }
+
+  public int getMaxEvents() {
+    return maxEvents;
+  }
+
+  public long getRequestId() {
+    return requestId;
+  }
+
+  public TezTaskAttemptID getCurrentTaskAttemptID() {
+    return currentTaskAttemptID;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (events != null) {
+      out.writeBoolean(true);
+      out.writeInt(events.size());
+      for (TezEvent e : events) {
+        e.write(out);
+      }
+    } else {
+      out.writeBoolean(false);
+    }
+    if (currentTaskAttemptID != null) {
+      out.writeBoolean(true);
+      currentTaskAttemptID.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+    out.writeInt(startIndex);
+    out.writeInt(maxEvents);
+    out.writeLong(requestId);
+    Text.writeString(out, containerIdentifier);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (in.readBoolean()) {
+      int eventsCount = in.readInt();
+      events = new ArrayList<TezEvent>(eventsCount);
+      for (int i = 0; i < eventsCount; ++i) {
+        TezEvent e = new TezEvent();
+        e.readFields(in);
+        events.add(e);
+      }
+    }
+    if (in.readBoolean()) {
+      currentTaskAttemptID = new TezTaskAttemptID();
+      currentTaskAttemptID.readFields(in);
+    } else {
+      currentTaskAttemptID = null;
+    }
+    startIndex = in.readInt();
+    maxEvents = in.readInt();
+    requestId = in.readLong();
+    containerIdentifier = Text.readString(in);
+  }
+
+  @Override
+  public String toString() {
+    return "{ "
+        + " containerId=" + containerIdentifier
+        + ", requestId=" + requestId
+        + ", startIndex=" + startIndex
+        + ", maxEventsToGet=" + maxEvents
+        + ", taskAttemptId" + currentTaskAttemptID
+        + ", eventCount=" + (events != null ? events.size() : 0)
+        + " }";
+  }
+}