You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/12 23:31:33 UTC

[1/3] tez git commit: TEZ-1933. Move TezChild and related classes into tez-runtime-internals. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master fda4c0bdc -> e250983e5


http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
new file mode 100644
index 0000000..6e655f9
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.ObjectRegistry;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+public class TezTaskRunner implements TezUmbilical, ErrorReporter {
+
+  private static final Logger LOG = Logger.getLogger(TezTaskRunner.class);
+
+  private final Configuration tezConf;
+  private final LogicalIOProcessorRuntimeTask task;
+  private final UserGroupInformation ugi;
+
+  private final TaskReporter taskReporter;
+  private final ListeningExecutorService executor;
+  private volatile ListenableFuture<Void> taskFuture;
+  private volatile Thread waitingThread;
+  private volatile Throwable firstException;
+
+  // Effectively a duplicate check, since hadFatalError does the same thing.
+  private final AtomicBoolean fatalErrorSent = new AtomicBoolean(false);
+  private final AtomicBoolean taskRunning;
+  private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
+  TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+      TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int appAttemptNumber,
+      Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
+      Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
+      ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid,
+      ExecutionContext ExecutionContext)
+          throws IOException {
+    this.tezConf = tezConf;
+    this.ugi = ugi;
+    this.taskReporter = taskReporter;
+    this.executor = executor;
+    task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this,
+        serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid,
+        ExecutionContext);
+    taskReporter.registerTask(task, this);
+    taskRunning = new AtomicBoolean(true);
+  }
+
+  /**
+   * @return false if a shutdown message was received during task execution
+   * @throws TezException
+   * @throws IOException
+   */
+  public boolean run() throws InterruptedException, IOException, TezException {
+    waitingThread = Thread.currentThread();
+    TaskRunnerCallable callable = new TaskRunnerCallable();
+    Throwable failureCause = null;
+    taskFuture = executor.submit(callable);
+    try {
+      taskFuture.get();
+
+      // Task could signal a fatal error and return control, or a failure while registering success.
+      failureCause = firstException;
+
+    } catch (InterruptedException e) {
+      LOG.info("Interrupted while waiting for task to complete. Interrupting task");
+      taskFuture.cancel(true);
+      if (shutdownRequested.get()) {
+        LOG.info("Shutdown requested... returning");
+        return false;
+      }
+      if (firstException != null) {
+        failureCause = firstException;
+      } else {
+        // Interrupted for some other reason.
+        failureCause = e;
+      }
+    } catch (ExecutionException e) {
+      // Exception thrown by the run() method itself.
+      Throwable cause = e.getCause();
+      if (cause instanceof FSError) {
+        // Not immediately fatal, this is an error reported by Hadoop FileSystem
+        failureCause = cause;
+      } else if (cause instanceof Error) {
+        LOG.error("Exception of type Error.", cause);
+        sendFailure(cause, "Fatal Error cause TezChild exit.");
+        throw new TezException("Fatal Error cause TezChild exit.", cause);
+      } else {
+        failureCause = cause;
+      }
+    } finally {
+      // Clear the interrupted status of the blocking thread, in case it is set after the
+      // InterruptedException was invoked.
+      taskReporter.unregisterTask(task.getTaskAttemptID());
+      Thread.interrupted();
+    }
+
+    if (failureCause != null) {
+      if (failureCause instanceof FSError) {
+        // Not immediately fatal, this is an error reported by Hadoop FileSystem
+        LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
+            failureCause);
+        throw (FSError) failureCause;
+      } else if (failureCause instanceof Error) {
+        LOG.error("Exception of type Error.", failureCause);
+        sendFailure(failureCause, "Fatal error cause TezChild exit.");
+        throw new TezException("Fatal error cause TezChild exit.", failureCause);
+      } else {
+        if (failureCause instanceof IOException) {
+          throw (IOException) failureCause;
+        } else if (failureCause instanceof TezException) {
+          throw (TezException) failureCause;
+        } else if (failureCause instanceof InterruptedException) {
+          throw (InterruptedException) failureCause;
+        } else {
+          throw new TezException(failureCause);
+        }
+      }
+    }
+    if (shutdownRequested.get()) {
+      LOG.info("Shutdown requested... returning");
+      return false;
+    }
+    return true;
+  }
+
+  private class TaskRunnerCallable implements Callable<Void> {
+
+    @Override
+    public Void call() throws Exception {
+      try {
+        return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            try {
+              LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+              task.initialize();
+              if (!Thread.currentThread().isInterrupted() && firstException == null) {
+                LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+                task.run();
+                LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+                task.close();
+                task.setFrameworkCounters();
+              }
+              LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID()
+                  + ", fatalErrorOccurred=" + (firstException != null));
+              if (firstException == null) {
+                try {
+                  taskReporter.taskSucceeded(task.getTaskAttemptID());
+                } catch (IOException e) {
+                  LOG.warn("Heartbeat failure caused by communication failure", e);
+                  maybeRegisterFirstException(e);
+                  // Falling off, since the runner thread checks for the registered exception.
+                } catch (TezException e) {
+                  LOG.warn("Heartbeat failure reported by AM", e);
+                  maybeRegisterFirstException(e);
+                  // Falling off, since the runner thread checks for the registered exception.
+                }
+              }
+              return null;
+            } catch (Throwable cause) {
+              if (cause instanceof FSError) {
+                // Not immediately fatal, this is an error reported by Hadoop FileSystem
+                maybeRegisterFirstException(cause);
+                LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
+                    cause);
+                try {
+                  sendFailure(cause, "FS Error in Child JVM");
+                } catch (Exception ignored) {
+                  // Ignored since another cause is already known
+                  LOG.info(
+                      "Ignoring the following exception since a previous exception is already registered",
+                      ignored);
+                }
+                throw (FSError) cause;
+              } else if (cause instanceof Error) {
+                LOG.error("Exception of type Error.", cause);
+                sendFailure(cause, "Fatal Error cause TezChild exit.");
+                throw new TezException("Fatal Error cause TezChild exit.", cause);
+              } else {
+                if (cause instanceof UndeclaredThrowableException) {
+                  cause = ((UndeclaredThrowableException) cause).getCause();
+                }
+                maybeRegisterFirstException(cause);
+                LOG.info("Encounted an error while executing task: " + task.getTaskAttemptID(),
+                    cause);
+                try {
+                  sendFailure(cause, "Failure while running task");
+                } catch (Exception ignored) {
+                  // Ignored since another cause is already known
+                  LOG.info(
+                      "Ignoring the following exception since a previous exception is already registered",
+                      ignored);
+                }
+                if (cause instanceof IOException) {
+                  throw (IOException) cause;
+                } else if (cause instanceof TezException) {
+                  throw (TezException) cause;
+                } else {
+                  throw new TezException(cause);
+                }
+              }
+            } finally {
+              task.cleanup();
+            }
+          }
+        });
+      } finally {
+        taskRunning.set(false);
+      }
+    }
+  }
+
+  // should wait until all messages are sent to AM before TezChild shutdown
+  // if this method become async in future
+  private void sendFailure(Throwable t, String message) throws IOException, TezException {
+    if (!fatalErrorSent.getAndSet(true)) {
+      task.setFatalError(t, message);
+      task.setFrameworkCounters();
+      try {
+        taskReporter.taskFailed(task.getTaskAttemptID(), t, message, null);
+      } catch (IOException e) {
+        // A failure reason already exists, Comm error just logged.
+        LOG.warn("Heartbeat failure caused by communication failure", e);
+        throw e;
+      } catch (TezException e) {
+        // A failure reason already exists, Comm error just logged.
+        LOG.warn("Heartbeat failure reported by AM", e);
+        throw e;
+      }
+    } else {
+      LOG.warn("Ignoring fatal error since another error has already been reported", t);
+    }
+  }
+
+  @Override
+  public void addEvents(Collection<TezEvent> events) {
+    if (taskRunning.get()) {
+      taskReporter.addEvents(task.getTaskAttemptID(), events);
+    }
+  }
+
+  @Override
+  public synchronized void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t,
+      String message, EventMetaData sourceInfo) {
+    // This can be called before a task throws an exception or after it.
+    // If called before a task throws an exception
+    // - ensure a heartbeat is sent with the diagnostics, and sent only once.
+    // - interrupt the waiting thread, and make it throw the reported error.
+    // If called after a task throws an exception, the waiting task has already returned, no point
+    // interrupting it.
+    // This case can be effectively ignored (log), as long as the run() method ends up throwing the
+    // exception.
+    //
+    //
+    if (!fatalErrorSent.getAndSet(true)) {
+      maybeRegisterFirstException(t);
+      try {
+        taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo);
+      } catch (IOException e) {
+        // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
+        // occurred earlier.
+        LOG.warn("Heartbeat failure caused by communication failure", e);
+      } catch (TezException e) {
+        // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
+        // occurred earlier.
+        LOG.warn("Heartbeat failure reported by AM", e);
+      } finally {
+        // Wake up the waiting thread so that it can return control
+        waitingThread.interrupt();
+      }
+    }
+  }
+
+  @Override
+  public boolean canCommit(TezTaskAttemptID taskAttemptID) {
+    if (taskRunning.get()) {
+      try {
+        return taskReporter.canCommit(taskAttemptID);
+      } catch (IOException e) {
+        LOG.warn("Communication failure while trying to commit", e);
+        maybeRegisterFirstException(e);
+        waitingThread.interrupt();
+        // Not informing the task since it will be interrupted.
+        // TODO: Should this be sent to the task as well, current Processors, etc do not handle
+        // interrupts very well.
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public synchronized void reportError(Throwable t) {
+   if (taskRunning.get()) {
+      LOG.error("TaskReporter reported error", t);
+      maybeRegisterFirstException(t);
+      waitingThread.interrupt();
+      // A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
+      // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
+      // method does not throw an exception, in which case task success is registered with the AM.
+      // Leave this handling to the next getTask / actual task.
+    } else {
+      LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
+          + " is already complete");
+    }
+  }
+
+  @Override
+  public void shutdownRequested() {
+    shutdownRequested.set(true);
+    waitingThread.interrupt();
+  }
+
+  private String getTaskDiagnosticsString(Throwable t, String message) {
+    String diagnostics;
+    if (t != null && message != null) {
+      diagnostics = "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + ", errorMessage="
+          + message;
+    } else if (t == null && message == null) {
+      diagnostics = "Unknown error";
+    } else {
+      diagnostics = t != null ? "exceptionThrown=" + ExceptionUtils.getStackTrace(t)
+          : " errorMessage=" + message;
+    }
+    return diagnostics;
+  }
+
+  private synchronized void maybeRegisterFirstException(Throwable t) {
+    if (firstException == null) {
+      firstException = t;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
new file mode 100644
index 0000000..4f94cfe
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -0,0 +1,729 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.runtime.common.resources.ScalingAllocator;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+// Tests in this class cannot be run in parallel.
+public class TestTaskExecution {
+
+  private static final Logger LOG = Logger.getLogger(TestTaskExecution.class);
+
+  private static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException";
+
+  private static final Configuration defaultConf = new Configuration();
+  private static final FileSystem localFs;
+  private static final Path workDir;
+
+  private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(1);
+
+  static {
+    defaultConf.set("fs.defaultFS", "file:///");
+    defaultConf.set(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
+        ScalingAllocator.class.getName());
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+      Path wd = new Path(System.getProperty("test.build.data", "/tmp"),
+          TestTaskExecution.class.getSimpleName());
+      workDir = localFs.makeQualified(wd);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Before
+  public void reset() {
+    TestProcessor.reset();
+  }
+
+  @AfterClass
+  public static void shutdown() {
+    taskExecutor.shutdownNow();
+  }
+
+  @Test(timeout = 5000)
+  public void testSingleSuccessfulTask() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.signal();
+      boolean result = taskRunnerFuture.get();
+      assertTrue(result);
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskSuccessEvent();
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testMultipleSuccessfulTasks() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.signal();
+      boolean result = taskRunnerFuture.get();
+      assertTrue(result);
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskSuccessEvent();
+      umbilical.resetTrackedEvents();
+
+      taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.signal();
+      result = taskRunnerFuture.get();
+      assertTrue(result);
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskSuccessEvent();
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  // test tasked failed due to exception in Processor
+  @Test(timeout = 5000)
+  public void testFailedTask() throws IOException, InterruptedException, TezException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_THROW_TEZ_EXCEPTION);
+      // Setup the executor
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      TestProcessor.signal();
+      try {
+        taskRunnerFuture.get();
+        fail("Expecting the task to fail");
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        LOG.info(cause.getClass().getName());
+        assertTrue(cause instanceof TezException);
+      }
+
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskFailedEvent("Failure while running task:org.apache.tez.dag.api.TezException: TezException");
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  // Test task failed due to Processor class not found
+  @Test(timeout = 5000)
+  public void testFailedTask2() throws IOException, InterruptedException, TezException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          "NotExitedProcessor", TestProcessor.CONF_THROW_TEZ_EXCEPTION);
+      // Setup the executor
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      try {
+        taskRunnerFuture.get();
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        LOG.info(cause.getClass().getName());
+        assertTrue(cause instanceof TezException);
+      }
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskFailedEvent("Failure while running task:org.apache.tez.dag.api.TezUncheckedException: "
+            + "Unable to load class: NotExitedProcessor");
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testHeartbeatException() throws IOException, InterruptedException, TezException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      umbilical.signalThrowException();
+      umbilical.awaitRegisteredEvent();
+      // Not signaling an actual start to verify task interruption
+      try {
+        taskRunnerFuture.get();
+        fail("Expecting the task to fail");
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        assertTrue(cause instanceof IOException);
+        assertTrue(cause.getMessage().contains(HEARTBEAT_EXCEPTION_STRING));
+      }
+      TestProcessor.awaitCompletion();
+      assertTrue(TestProcessor.wasInterrupted());
+      assertNull(taskReporter.currentCallable);
+      // No completion events since umbilical communication already failed.
+      umbilical.verifyNoCompletionEvents();
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testHeartbeatShouldDie() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      umbilical.signalSendShouldDie();
+      umbilical.awaitRegisteredEvent();
+      // Not signaling an actual start to verify task interruption
+
+      boolean result = taskRunnerFuture.get();
+      assertFalse(result);
+
+      TestProcessor.awaitCompletion();
+      assertTrue(TestProcessor.wasInterrupted());
+      assertNull(taskReporter.currentCallable);
+      // TODO Is this statement correct ?
+      // No completion events since shouldDie was requested by the AM, which should have killed the
+      // task.
+      umbilical.verifyNoCompletionEvents();
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testGetTaskShouldDie() throws InterruptedException, ExecutionException {
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+      ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+
+      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
+      ContainerContext containerContext = new ContainerContext(containerId.toString());
+
+      ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100);
+      ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+
+      getTaskFuture.get();
+      assertEquals(1, umbilical.getTaskInvocations);
+
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  // Potential new tests
+  // Different states - initialization failure, close failure
+  // getTask states
+
+  private static class TaskRunnerCallable implements Callable<Boolean> {
+    private final TezTaskRunner taskRunner;
+
+    public TaskRunnerCallable(TezTaskRunner taskRunner) {
+      this.taskRunner = taskRunner;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      return taskRunner.run();
+    }
+  }
+
+  // Uses static fields for signaling. Ensure only used by one test at a time.
+  public static class TestProcessor extends AbstractLogicalIOProcessor {
+
+    public static final byte[] CONF_EMPTY = new byte[] { 0 };
+    public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 };
+    public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 };
+    public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 };
+    public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 };
+    public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 };
+
+    private static final Logger LOG = Logger.getLogger(TestProcessor.class);
+
+    private static final ReentrantLock processorLock = new ReentrantLock();
+    private static final Condition processorCondition = processorLock.newCondition();
+    private static final Condition completionCondition = processorLock.newCondition();
+    private static final Condition runningCondition = processorLock.newCondition();
+    private static boolean completed = false;
+    private static boolean running = false;
+    private static boolean signalled = false;
+
+    public static boolean receivedInterrupt = false;
+
+    private boolean throwIOException = false;
+    private boolean throwTezException = false;
+    private boolean signalFatalAndThrow = false;
+    private boolean signalFatalAndLoop = false;
+    private boolean signalFatalAndComplete = false;
+
+    public TestProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+      parseConf(getContext().getUserPayload().deepCopyAsArray());
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+
+    private void parseConf(byte[] bytes) {
+      byte b = bytes[0];
+      throwIOException = (b & 1) > 1;
+      throwTezException = (b & 2) > 1;
+      signalFatalAndThrow = (b & 4) > 1;
+      signalFatalAndLoop = (b & 8) > 1;
+      signalFatalAndComplete = (b & 16) > 1;
+    }
+
+    public static void reset() {
+      signalled = false;
+      receivedInterrupt = false;
+      completed = false;
+      running = false;
+    }
+
+    public static void signal() {
+      LOG.info("Signalled");
+      processorLock.lock();
+      try {
+        signalled = true;
+        processorCondition.signal();
+      } finally {
+        processorLock.unlock();
+      }
+    }
+
+    public static void awaitStart() throws InterruptedException {
+      LOG.info("Awaiting Process run");
+      processorLock.lock();
+      try {
+        if (running) {
+          return;
+        }
+        runningCondition.await();
+      } finally {
+        processorLock.unlock();
+      }
+    }
+
+    public static void awaitCompletion() throws InterruptedException {
+      LOG.info("Await completion");
+      processorLock.lock();
+      try {
+        if (completed) {
+          return;
+        } else {
+          completionCondition.await();
+        }
+      } finally {
+        processorLock.unlock();
+      }
+    }
+
+    public static boolean wasInterrupted() {
+      processorLock.lock();
+      try {
+        return receivedInterrupt;
+      } finally {
+        processorLock.unlock();
+      }
+    }
+
+    @Override
+    public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws
+        Exception {
+      processorLock.lock();
+      running = true;
+      runningCondition.signal();
+      try {
+        try {
+          LOG.info("Signal is: " + signalled);
+          if (!signalled) {
+            LOG.info("Waiting for processor signal");
+            processorCondition.await();
+          }
+          if (Thread.currentThread().isInterrupted()) {
+            throw new InterruptedException();
+          }
+          LOG.info("Received processor signal");
+          if (throwIOException) {
+            throw new IOException();
+          } else if (throwTezException) {
+            throw new TezException("TezException");
+          } else if (signalFatalAndThrow) {
+            IOException io = new IOException("FATALERROR");
+            getContext().fatalError(io, "FATALERROR");
+            throw io;
+          } else if (signalFatalAndComplete) {
+            IOException io = new IOException("FATALERROR");
+            getContext().fatalError(io, "FATALERROR");
+            return;
+          } else if (signalFatalAndLoop) {
+            IOException io = new IOException("FATALERROR");
+            getContext().fatalError(io, "FATALERROR");
+            LOG.info("Waiting for Processor signal again");
+            processorCondition.await();
+            LOG.info("Received second processor signal");
+          }
+        } catch (InterruptedException e) {
+          receivedInterrupt = true;
+        }
+      } finally {
+        completed = true;
+        completionCondition.signal();
+        processorLock.unlock();
+      }
+    }
+  }
+
+  private static class TezTaskUmbilicalForTest implements TezTaskUmbilicalProtocol {
+
+    private static final Logger LOG = Logger.getLogger(TezTaskUmbilicalForTest.class);
+
+    private final List<TezEvent> requestEvents = new LinkedList<TezEvent>();
+
+    private final ReentrantLock umbilicalLock = new ReentrantLock();
+    private final Condition eventCondition = umbilicalLock.newCondition();
+    private boolean pendingEvent = false;
+    private boolean eventEnacted = false;
+
+    volatile int getTaskInvocations = 0;
+
+    private boolean shouldThrowException = false;
+    private boolean shouldSendDieSignal = false;
+
+    public void signalThrowException() {
+      umbilicalLock.lock();
+      try {
+        shouldThrowException = true;
+        pendingEvent = true;
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void signalSendShouldDie() {
+      umbilicalLock.lock();
+      try {
+        shouldSendDieSignal = true;
+        pendingEvent = true;
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void awaitRegisteredEvent() throws InterruptedException {
+      umbilicalLock.lock();
+      try {
+        if (eventEnacted) {
+          return;
+        }
+        LOG.info("Awaiting event");
+        eventCondition.await();
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void resetTrackedEvents() {
+      umbilicalLock.lock();
+      try {
+        requestEvents.clear();
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void verifyNoCompletionEvents() {
+      umbilicalLock.lock();
+      try {
+        for (TezEvent event : requestEvents) {
+          if (event.getEvent() instanceof TaskAttemptFailedEvent) {
+            fail("Found a TaskAttemptFailedEvent when not expected");
+          }
+          if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
+            fail("Found a TaskAttemptCompletedvent when not expected");
+          }
+        }
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void verifyTaskFailedEvent(String diagnostics) {
+      umbilicalLock.lock();
+      try {
+        for (TezEvent event : requestEvents) {
+          if (event.getEvent() instanceof TaskAttemptFailedEvent) {
+            TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent)event.getEvent();
+            if(failedEvent.getDiagnostics().startsWith(diagnostics)){
+              return ;
+            } else {
+              fail("No detailed diagnostics message in TaskAttemptFailedEvent");
+            }
+          }
+        }
+        fail("No TaskAttemptFailedEvents sent over umbilical");
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void verifyTaskSuccessEvent() {
+      umbilicalLock.lock();
+      try {
+        for (TezEvent event : requestEvents) {
+          if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
+            return;
+          }
+        }
+        fail("No TaskAttemptFailedEvents sent over umbilical");
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+      return 0;
+    }
+
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+        int clientMethodsHash) throws IOException {
+      return null;
+    }
+
+    @Override
+    public ContainerTask getTask(ContainerContext containerContext) throws IOException {
+      // Return shouldDie = true
+      getTaskInvocations++;
+      return new ContainerTask(null, true, null, null, false);
+    }
+
+    @Override
+    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+      return true;
+    }
+
+    @Override
+    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
+        TezException {
+      umbilicalLock.lock();
+      if (request.getEvents() != null) {
+        requestEvents.addAll(request.getEvents());
+      }
+      try {
+        if (shouldThrowException) {
+          LOG.info("TestUmbilical throwing Exception");
+          throw new IOException(HEARTBEAT_EXCEPTION_STRING);
+        }
+        TezHeartbeatResponse response = new TezHeartbeatResponse();
+        response.setLastRequestId(request.getRequestId());
+        if (shouldSendDieSignal) {
+          LOG.info("TestUmbilical returning shouldDie=true");
+          response.setShouldDie();
+        }
+        return response;
+      } finally {
+        if (pendingEvent) {
+          eventEnacted = true;
+          LOG.info("Signalling Event");
+          eventCondition.signal();
+        }
+        umbilicalLock.unlock();
+      }
+    }
+  }
+
+  private TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) {
+    TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
+        createContainerId(appId).toString());
+    return taskReporter;
+  }
+
+  private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical,
+      TaskReporter taskReporter, ListeningExecutorService executor, byte[] processorConf)
+      throws IOException {
+    return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(),
+        processorConf);
+  }
+
+  private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical,
+      TaskReporter taskReporter, ListeningExecutorService executor, String processorClass, byte[] processorConf) throws IOException{
+    TezConfiguration tezConf = new TezConfiguration(defaultConf);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    Path testDir = new Path(workDir, UUID.randomUUID().toString());
+    String[] localDirs = new String[] { testDir.toString() };
+
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
+    TezTaskID taskId = TezTaskID.getInstance(vertexId, 1);
+    TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
+    ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create(processorClass)
+        .setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf)));
+    TaskSpec taskSpec = new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
+        new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
+
+    TezTaskRunner taskRunner = new TezTaskRunner(tezConf, ugi, localDirs, taskSpec, umbilical, 1,
+        new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String> create(), taskReporter,
+        executor, null, "", new ExecutionContextImpl("localhost"));
+    return taskRunner;
+  }
+
+  private ContainerId createContainerId(ApplicationId appId) {
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+    return containerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
new file mode 100644
index 0000000..9add252
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Lists;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestTaskReporter {
+
+  @Test(timeout = 10000)
+  public void testContinuousHeartbeatsOnMaxEvents() throws Exception {
+
+    final Object lock = new Object();
+    final AtomicBoolean hb2Done = new AtomicBoolean(false);
+
+    TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        TezHeartbeatRequest request = (TezHeartbeatRequest) args[0];
+        if (request.getRequestId() == 1 || request.getRequestId() == 2) {
+          TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(5));
+          response.setLastRequestId(request.getRequestId());
+          return response;
+        } else if (request.getRequestId() == 3) {
+          TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(1));
+          response.setLastRequestId(request.getRequestId());
+          synchronized (lock) {
+            hb2Done.set(true);
+            lock.notify();
+          }
+          return response;
+        } else {
+          throw new TezUncheckedException("Invalid request id for test: " + request.getRequestId());
+        }
+      }
+    }).when(mockUmbilical).heartbeat(any(TezHeartbeatRequest.class));
+
+    TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);
+    LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
+    doReturn("vertexName").when(mockTask).getVertexName();
+    doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID();
+
+    // Setup the sleep time to be way higher than the test timeout
+    TaskReporter.HeartbeatCallable heartbeatCallable =
+        new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5,
+            new AtomicLong(0),
+            "containerIdStr");
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    executor.submit(heartbeatCallable);
+    try {
+      synchronized (lock) {
+        if (!hb2Done.get()) {
+          lock.wait();
+        }
+      }
+      verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
+      Thread.sleep(2000l);
+      // Sleep for 2 seconds, less than the callable sleep time. No more invocations.
+      verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
+    } finally {
+      executor.shutdownNow();
+    }
+
+  }
+
+  private List<TezEvent> createEvents(int numEvents) {
+    List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);
+    for (int i = 0; i < numEvents; i++) {
+      list.add(mock(TezEvent.class));
+    }
+    return list;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index e01b985..629bab8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -32,10 +32,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -62,10 +62,7 @@ public class ShuffleUtils {
 
   public static ByteBuffer convertJobTokenToBytes(
       Token<JobTokenIdentifier> jobToken) throws IOException {
-    DataOutputBuffer dob = new DataOutputBuffer();
-    jobToken.write(dob);
-    ByteBuffer bb = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-    return bb;
+    return TezCommonUtils.convertJobTokenToBytes(jobToken);
   }
 
   public static int deserializeShuffleProviderMetaData(ByteBuffer meta)


[3/3] tez git commit: TEZ-1933. Move TezChild and related classes into tez-runtime-internals. (sseth)

Posted by ss...@apache.org.
TEZ-1933. Move TezChild and related classes into tez-runtime-internals.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e250983e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e250983e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e250983e

Branch: refs/heads/master
Commit: e250983e57ba7fb60d71fb0b11aca331b209a876
Parents: fda4c0b
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jan 12 14:31:15 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jan 12 14:31:15 2015 -0800

----------------------------------------------------------------------
 .../org/apache/tez/common/TezCommonUtils.java   |  11 +
 .../tez/dag/utils/RelocalizationUtils.java      |  95 +++
 .../tez/dag/utils/RelocalizationUtils.java      |  93 ---
 .../tez/runtime/task/ContainerReporter.java     |  84 ---
 .../apache/tez/runtime/task/ErrorReporter.java  |  26 -
 .../apache/tez/runtime/task/TaskReporter.java   | 401 ----------
 .../org/apache/tez/runtime/task/TezChild.java   | 456 ------------
 .../apache/tez/runtime/task/TezTaskRunner.java  | 375 ----------
 .../tez/runtime/task/TestTaskExecution.java     | 711 ------------------
 .../tez/runtime/task/TestTaskReporter.java      | 115 ---
 .../tez/runtime/task/ContainerReporter.java     |  84 +++
 .../apache/tez/runtime/task/ErrorReporter.java  |  26 +
 .../apache/tez/runtime/task/TaskReporter.java   | 401 ++++++++++
 .../org/apache/tez/runtime/task/TezChild.java   | 456 ++++++++++++
 .../apache/tez/runtime/task/TezTaskRunner.java  | 375 ++++++++++
 .../tez/runtime/task/TestTaskExecution.java     | 729 +++++++++++++++++++
 .../tez/runtime/task/TestTaskReporter.java      | 114 +++
 .../library/common/shuffle/ShuffleUtils.java    |   7 +-
 18 files changed, 2293 insertions(+), 2266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index b7a402d..685c728 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -19,6 +19,7 @@
 package org.apache.tez.common;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -39,10 +40,12 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -385,6 +388,14 @@ public class TezCommonUtils {
     return sb.toString();
   }
 
+  public static ByteBuffer convertJobTokenToBytes(
+      Token<JobTokenIdentifier> jobToken) throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer();
+    jobToken.write(dob);
+    ByteBuffer bb = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    return bb;
+  }
+
   public static void logCredentials(Log log, Credentials credentials, String identifier) {
     if (log.isDebugEnabled()) {
       log.debug(getCredentialsInfo(credentials, identifier));

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
new file mode 100644
index 0000000..de0b94c
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezException;
+
+import com.google.common.collect.Lists;
+
+@InterfaceAudience.Private
+public class RelocalizationUtils {
+  
+  public static List<URL> processAdditionalResources(Map<String, URI> additionalResources,
+      Configuration conf, String destDir) throws IOException, TezException {
+    if (additionalResources == null || additionalResources.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    List<URL> urls = Lists.newArrayListWithCapacity(additionalResources.size());
+
+    for (Entry<String, URI> lrEntry : additionalResources.entrySet()) {
+      Path dFile = downloadResource(lrEntry.getKey(), lrEntry.getValue(), conf, destDir);
+      urls.add(dFile.toUri().toURL());
+    }
+    return urls;
+  }
+
+  public static void addUrlsToClassPath(List<URL> urls) {
+    ReflectionUtils.addResourcesToSystemClassLoader(urls);
+  }
+
+  private static Path downloadResource(String destName, URI uri, Configuration conf, String destDir)
+      throws IOException {
+    FileSystem fs = FileSystem.get(uri, conf);
+    Path cwd = new Path(destDir);
+    Path dFile = new Path(cwd, destName);
+    Path srcPath = new Path(uri);
+    fs.copyToLocalFile(srcPath, dFile);
+    return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
+  }
+
+  public static byte[] getLocalSha(Path path, Configuration conf) throws IOException {
+    InputStream is = null;
+    try {
+      is = FileSystem.getLocal(conf).open(path);
+      return DigestUtils.sha256(is);
+    } finally {
+      if (is != null) {
+        is.close();
+      }
+    }
+  }
+
+  public static byte[] getResourceSha(URI uri, Configuration conf) throws IOException {
+    InputStream is = null;
+    try {
+      is = FileSystem.get(uri, conf).open(new Path(uri));
+      return DigestUtils.sha256(is);
+    } finally {
+      if (is != null) {
+        is.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
deleted file mode 100644
index 7b5c49c..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.utils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.TezException;
-
-import com.google.common.collect.Lists;
-
-public class RelocalizationUtils {
-  
-  public static List<URL> processAdditionalResources(Map<String, URI> additionalResources,
-      Configuration conf, String destDir) throws IOException, TezException {
-    if (additionalResources == null || additionalResources.isEmpty()) {
-      return Collections.emptyList();
-    }
-
-    List<URL> urls = Lists.newArrayListWithCapacity(additionalResources.size());
-
-    for (Entry<String, URI> lrEntry : additionalResources.entrySet()) {
-      Path dFile = downloadResource(lrEntry.getKey(), lrEntry.getValue(), conf, destDir);
-      urls.add(dFile.toUri().toURL());
-    }
-    return urls;
-  }
-
-  public static void addUrlsToClassPath(List<URL> urls) {
-    ReflectionUtils.addResourcesToSystemClassLoader(urls);
-  }
-
-  private static Path downloadResource(String destName, URI uri, Configuration conf, String destDir)
-      throws IOException {
-    FileSystem fs = FileSystem.get(uri, conf);
-    Path cwd = new Path(destDir);
-    Path dFile = new Path(cwd, destName);
-    Path srcPath = new Path(uri);
-    fs.copyToLocalFile(srcPath, dFile);
-    return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
-  }
-
-  public static byte[] getLocalSha(Path path, Configuration conf) throws IOException {
-    InputStream is = null;
-    try {
-      is = FileSystem.getLocal(conf).open(path);
-      return DigestUtils.sha256(is);
-    } finally {
-      if (is != null) {
-        is.close();
-      }
-    }
-  }
-
-  public static byte[] getResourceSha(URI uri, Configuration conf) throws IOException {
-    InputStream is = null;
-    try {
-      is = FileSystem.get(uri, conf).open(new Path(uri));
-      return DigestUtils.sha256(is);
-    } finally {
-      if (is != null) {
-        is.close();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
deleted file mode 100644
index a68c7c1..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-
-/**
- * Responsible for communication between a running Container and the ApplicationMaster. The main
- * functionality is to poll for new tasks.
- * 
- */
-public class ContainerReporter implements Callable<ContainerTask> {
-
-  private static final Logger LOG = Logger.getLogger(ContainerReporter.class);
-
-  private final TezTaskUmbilicalProtocol umbilical;
-  private final ContainerContext containerContext;
-  private final int getTaskMaxSleepTime;
-  private final long LOG_INTERVAL = 2000l;
-
-  private long nextGetTaskPrintTime;
-
-  ContainerReporter(TezTaskUmbilicalProtocol umbilical, ContainerContext containerContext,
-      int getTaskMaxSleepTime) {
-    this.umbilical = umbilical;
-    this.containerContext = containerContext;
-    this.getTaskMaxSleepTime = getTaskMaxSleepTime;
-  }
-
-  @Override
-  public ContainerTask call() throws Exception {
-    ContainerTask containerTask = null;
-    LOG.info("Attempting to fetch new task");
-    containerTask = umbilical.getTask(containerContext);
-    long getTaskPollStartTime = System.currentTimeMillis();
-    nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL;
-    for (int idle = 1; containerTask == null; idle++) {
-      long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
-      maybeLogSleepMessage(sleepTimeMilliSecs);
-      TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs);
-      containerTask = umbilical.getTask(containerContext);
-    }
-    LOG.info("Got TaskUpdate: "
-        + (System.currentTimeMillis() - getTaskPollStartTime)
-        + " ms after starting to poll."
-        + " TaskInfo: shouldDie: "
-        + containerTask.shouldDie()
-        + (containerTask.shouldDie() == true ? "" : ", currentTaskAttemptId: "
-            + containerTask.getTaskSpec().getTaskAttemptID()));
-    return containerTask;
-  }
-
-  private void maybeLogSleepMessage(long sleepTimeMilliSecs) {
-    long currentTime = System.currentTimeMillis();
-    if (sleepTimeMilliSecs + currentTime > nextGetTaskPrintTime) {
-      LOG.info("Sleeping for " + sleepTimeMilliSecs
-          + "ms before retrying getTask again. Got null now. "
-          + "Next getTask sleep message after " + LOG_INTERVAL + "ms");
-      nextGetTaskPrintTime = currentTime + sleepTimeMilliSecs + LOG_INTERVAL;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
deleted file mode 100644
index 1146ce4..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-public interface ErrorReporter {
-
-  void reportError(Throwable t);
-  
-  void shutdownRequested();
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
deleted file mode 100644
index 15dcbb0..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.log4j.Logger;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Responsible for communication between tasks running in a Container and the ApplicationMaster.
- * Takes care of sending heartbeats (regular and OOB) to the AM - to send generated events, and to
- * retrieve events specific to this task.
- * 
- */
-public class TaskReporter {
-
-  private static final Logger LOG = Logger.getLogger(TaskReporter.class);
-
-  private final TezTaskUmbilicalProtocol umbilical;
-  private final long pollInterval;
-  private final long sendCounterInterval;
-  private final int maxEventsToGet;
-  private final AtomicLong requestCounter;
-  private final String containerIdStr;
-
-  private final ListeningExecutorService heartbeatExecutor;
-
-  @VisibleForTesting
-  HeartbeatCallable currentCallable;
-
-  public TaskReporter(TezTaskUmbilicalProtocol umbilical, long amPollInterval,
-      long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
-    this.umbilical = umbilical;
-    this.pollInterval = amPollInterval;
-    this.sendCounterInterval = sendCounterInterval;
-    this.maxEventsToGet = maxEventsToGet;
-    this.requestCounter = requestCounter;
-    this.containerIdStr = containerIdStr;
-    ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
-    heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
-  }
-
-  /**
-   * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc.
-   */
-  public synchronized void registerTask(LogicalIOProcessorRuntimeTask task,
-      ErrorReporter errorReporter) {
-    currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
-        maxEventsToGet, requestCounter, containerIdStr);
-    ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
-    Futures.addCallback(future, new HeartbeatCallback(errorReporter));
-  }
-
-  /**
-   * This method should always be invoked before setting up heartbeats for another task running in
-   * the same container.
-   */
-  public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
-    currentCallable.markComplete();
-    currentCallable = null;
-  }
-  
-  public void shutdown() {
-    heartbeatExecutor.shutdownNow();
-  }
-
-  @VisibleForTesting
-  static class HeartbeatCallable implements Callable<Boolean> {
-
-    private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
-    private static final float LOG_COUNTER_BACKOFF = 1.3f;
-
-    private final LogicalIOProcessorRuntimeTask task;
-    private EventMetaData updateEventMetadata;
-
-    private final TezTaskUmbilicalProtocol umbilical;
-
-    private final long pollInterval;
-    private final long sendCounterInterval;
-    private final int maxEventsToGet;
-    private final String containerIdStr;
-
-    private final AtomicLong requestCounter;
-
-    private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>();
-
-    private final ReentrantLock lock = new ReentrantLock();
-    private final Condition condition = lock.newCondition();
-
-    /*
-     * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
-     * log counters.
-     */
-    private int nonOobHeartbeatCounter = 0;
-    private int nextHeartbeatNumToLog = 0;
-    /*
-     * Tracks the last non-OOB heartbeat number at which counters were sent to the AM. 
-     */
-    private int prevCounterSendHeartbeatNum = 0;
-
-    public HeartbeatCallable(LogicalIOProcessorRuntimeTask task,
-        TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval,
-        int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
-
-      this.pollInterval = amPollInterval;
-      this.sendCounterInterval = sendCounterInterval;
-      this.maxEventsToGet = maxEventsToGet;
-      this.requestCounter = requestCounter;
-      this.containerIdStr = containerIdStr;
-
-      this.task = task;
-      this.umbilical = umbilical;
-      this.updateEventMetadata = new EventMetaData(EventProducerConsumerType.SYSTEM,
-          task.getVertexName(), "", task.getTaskAttemptID());
-
-      nextHeartbeatNumToLog = (Math.max(1,
-          (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
-              : (float) amPollInterval))));
-    }
-
-    @Override
-    public Boolean call() throws Exception {
-      // Heartbeat only for active tasks. Errors, etc will be reported directly.
-      while (!task.isTaskDone() && !task.hadFatalError()) {
-        ResponseWrapper response = heartbeat(null);
-
-        if (response.shouldDie) {
-          // AM sent a shouldDie=true
-          LOG.info("Asked to die via task heartbeat");
-          return false;
-        } else {
-          if (response.numEvents < maxEventsToGet) {
-            // Wait before sending another heartbeat. Otherwise consider as an OOB heartbeat
-            lock.lock();
-            try {
-              boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
-              if (!interrupted) {
-                nonOobHeartbeatCounter++;
-              }
-            } finally {
-              lock.unlock();
-            }
-          }
-        }
-      }
-      int pendingEventCount = eventsToSend.size();
-      if (pendingEventCount > 0) {
-        LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
-      }
-      return true;
-    }
-
-    /**
-     * @param eventsArg
-     * @return
-     * @throws IOException
-     *           indicates an RPC communication failure.
-     * @throws TezException
-     *           indicates an exception somewhere in the AM.
-     */
-    private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) throws IOException,
-        TezException {
-
-      if (eventsArg != null) {
-        eventsToSend.addAll(eventsArg);
-      }
-
-      TezEvent updateEvent = null;
-      List<TezEvent> events = new ArrayList<TezEvent>();
-      eventsToSend.drainTo(events);
-
-      if (!task.isTaskDone() && !task.hadFatalError()) {
-        TezCounters counters = null;
-        /**
-         * Increasing the heartbeat interval can delay the delivery of events. Sending just updated
-         * records would save CPU in DAG AM, but certain counters are updated very frequently. Until
-         * real time decisions are made based on these counters, it can be sent once per second.
-         */
-        // Not completely accurate, since OOB heartbeats could go out.
-        if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
-          counters = task.getCounters();
-          prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
-        }
-        updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
-            updateEventMetadata);
-        events.add(updateEvent);
-      }
-
-      long requestId = requestCounter.incrementAndGet();
-      TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
-          task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Sending heartbeat to AM, request=" + request);
-      }
-
-      maybeLogCounters();
-
-      TezHeartbeatResponse response = umbilical.heartbeat(request);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Received heartbeat response from AM, response=" + response);
-      }
-
-      if (response.shouldDie()) {
-        LOG.info("Received should die response from AM");
-        return new ResponseWrapper(true, 1);
-      }
-      if (response.getLastRequestId() != requestId) {
-        throw new TezException("AM and Task out of sync" + ", responseReqId="
-            + response.getLastRequestId() + ", expectedReqId=" + requestId);
-      }
-
-      // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
-      // are running using the same umbilical.
-      int numEventsReceived = 0;
-      if (task.isTaskDone() || task.hadFatalError()) {
-        if (response.getEvents() != null && !response.getEvents().isEmpty()) {
-          LOG.warn("Current task already complete, Ignoring all event in"
-              + " heartbeat response, eventCount=" + response.getEvents().size());
-        }
-      } else {
-        if (response.getEvents() != null && !response.getEvents().isEmpty()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
-                + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
-          }
-          // This should ideally happen in a separate thread
-          numEventsReceived = response.getEvents().size();
-          task.handleEvents(response.getEvents());
-        }
-      }
-      return new ResponseWrapper(false, numEventsReceived);
-    }
-
-    public void markComplete() {
-      // Notify to clear pending events, if any.
-      lock.lock();
-      try {
-        condition.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-
-    private void maybeLogCounters() {
-      if (LOG.isDebugEnabled()) {
-        if (nonOobHeartbeatCounter == nextHeartbeatNumToLog) {
-          LOG.debug("Counters: " + task.getCounters().toShortString());
-          nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
-        }
-      }
-    }
-
-    /**
-     * Sends out final events for task success.
-     * @param taskAttemptID
-     * @return
-     * @throws IOException
-     *           indicates an RPC communication failure.
-     * @throws TezException
-     *           indicates an exception somewhere in the AM.
-     */
-    private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
-      TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
-          task.getProgress()), updateEventMetadata);
-      TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
-          updateEventMetadata);
-      return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
-    }
-
-    /**
-     * Sends out final events for task failure.
-     * @param taskAttemptID
-     * @param t
-     * @param diagnostics
-     * @param srcMeta
-     * @return
-     * @throws IOException
-     *           indicates an RPC communication failure.
-     * @throws TezException
-     *           indicates an exception somewhere in the AM.
-     */
-    private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
-        EventMetaData srcMeta) throws IOException, TezException {
-      TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
-          task.getProgress()), updateEventMetadata);
-      if (diagnostics == null) {
-        diagnostics = ExceptionUtils.getStackTrace(t);
-      } else {
-        diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
-      }
-      TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
-          srcMeta == null ? updateEventMetadata : srcMeta);
-      return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
-    }
-
-    private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
-      if (events != null && !events.isEmpty()) {
-        eventsToSend.addAll(events);
-      }
-    }
-  }
-
-  private static class HeartbeatCallback implements FutureCallback<Boolean> {
-
-    private final ErrorReporter errorReporter;
-
-    HeartbeatCallback(ErrorReporter errorReporter) {
-      this.errorReporter = errorReporter;
-    }
-
-    @Override
-    public void onSuccess(Boolean result) {
-      if (result == false) {
-        errorReporter.shutdownRequested();
-      }
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      errorReporter.reportError(t);
-    }
-  }
-
-  public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
-    return currentCallable.taskSucceeded(taskAttemptID);
-  }
-
-  public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
-      EventMetaData srcMeta) throws IOException, TezException {
-    return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
-  }
-
-  public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
-    currentCallable.addEvents(taskAttemptID, events);
-  }
-
-  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
-    return umbilical.canCommit(taskAttemptID);
-  }
-
-  private static final class ResponseWrapper {
-    boolean shouldDie;
-    int numEvents;
-
-    private ResponseWrapper(boolean shouldDie, int numEvents) {
-      this.shouldDie = shouldDie;
-      this.numEvents = numEvents;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
deleted file mode 100644
index ae77709..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.common.TezLocalResource;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.common.counters.Limits;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.utils.RelocalizationUtils;
-import org.apache.tez.runtime.api.ExecutionContext;
-import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class TezChild {
-
-  private static final Logger LOG = Logger.getLogger(TezChild.class);
-
-  private final Configuration defaultConf;
-  private final String containerIdString;
-  private final int appAttemptNumber;
-  private final String[] localDirs;
-
-  private final AtomicLong heartbeatCounter = new AtomicLong(0);
-
-  private final int getTaskMaxSleepTime;
-  private final int amHeartbeatInterval;
-  private final long sendCounterInterval;
-  private final int maxEventsToGet;
-  private final boolean isLocal;
-  private final String workingDir;
-
-  private final ListeningExecutorService executor;
-  private final ObjectRegistryImpl objectRegistry;
-  private final String pid;
-  private final ExecutionContext ExecutionContext;
-  private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
-  private final Map<String, String> serviceProviderEnvMap;
-
-  private Multimap<String, String> startedInputsMap = HashMultimap.create();
-
-  private TaskReporter taskReporter;
-  private TezTaskUmbilicalProtocol umbilical;
-  private int taskCount = 0;
-  private TezVertexID lastVertexID;
-
-  public TezChild(Configuration conf, String host, int port, String containerIdentifier,
-      String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
-      Map<String, String> serviceProviderEnvMap,
-      ObjectRegistryImpl objectRegistry, String pid,
-      ExecutionContext ExecutionContext)
-      throws IOException, InterruptedException {
-    this.defaultConf = conf;
-    this.containerIdString = containerIdentifier;
-    this.appAttemptNumber = appAttemptNumber;
-    this.localDirs = localDirs;
-    this.serviceProviderEnvMap = serviceProviderEnvMap;
-    this.workingDir = workingDir;
-    this.pid = pid;
-    this.ExecutionContext = ExecutionContext;
-
-    getTaskMaxSleepTime = defaultConf.getInt(
-        TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
-        TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
-
-    amHeartbeatInterval = defaultConf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
-        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
-
-    sendCounterInterval = defaultConf.getLong(
-        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
-        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
-
-    maxEventsToGet = defaultConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
-        TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
-
-    ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("TezChild").build());
-    this.executor = MoreExecutors.listeningDecorator(executor);
-
-    this.objectRegistry = objectRegistry;
-
-    // Security framework already loaded the tokens into current ugi
-    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Executing with tokens:");
-      for (Token<?> token : credentials.getAllTokens()) {
-        LOG.debug(token);
-      }
-    }
-
-    this.isLocal = defaultConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
-        TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
-    UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
-    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
-
-    serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
-        ShuffleUtils.convertJobTokenToBytes(jobToken));
-
-    if (!isLocal) {
-      final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);
-      SecurityUtil.setTokenService(jobToken, address);
-      taskOwner.addToken(jobToken);
-      umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
-        @Override
-        public TezTaskUmbilicalProtocol run() throws Exception {
-          return RPC.getProxy(TezTaskUmbilicalProtocol.class,
-              TezTaskUmbilicalProtocol.versionID, address, defaultConf);
-        }
-      });
-    }
-  }
-  
-  public ContainerExecutionResult run() throws IOException, InterruptedException, TezException {
-
-    ContainerContext containerContext = new ContainerContext(containerIdString);
-    ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext,
-        getTaskMaxSleepTime);
-
-    taskReporter = new TaskReporter(umbilical, amHeartbeatInterval,
-        sendCounterInterval, maxEventsToGet, heartbeatCounter, containerIdString);
-
-    UserGroupInformation childUGI = null;
-
-    while (!executor.isTerminated()) {
-      if (taskCount > 0) {
-        TezUtilsInternal.updateLoggers("");
-      }
-      ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
-      ContainerTask containerTask = null;
-      try {
-        containerTask = getTaskFuture.get();
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        handleError(cause);
-        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
-            cause, "Execution Exception while fetching new work: " + e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.info("Interrupted while waiting for new work:"
-            + containerTask.getTaskSpec().getTaskAttemptID());
-        handleError(e);
-        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
-            "Interrupted while waiting for new work");
-      }
-      if (containerTask.shouldDie()) {
-        LOG.info("ContainerTask returned shouldDie=true, Exiting");
-        shutdown();
-        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
-            "Asked to die by the AM");
-      } else {
-        String loggerAddend = containerTask.getTaskSpec().getTaskAttemptID().toString();
-        taskCount++;
-        TezUtilsInternal.updateLoggers(loggerAddend);
-        FileSystem.clearStatistics();
-
-        childUGI = handleNewTaskCredentials(containerTask, childUGI);
-        handleNewTaskLocalResources(containerTask);
-        cleanupOnTaskChanged(containerTask);
-
-        // Execute the Actual Task
-        TezTaskRunner taskRunner = new TezTaskRunner(defaultConf, childUGI,
-            localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber,
-            serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
-            executor, objectRegistry, pid, this.ExecutionContext);
-        boolean shouldDie;
-        try {
-          shouldDie = !taskRunner.run();
-          if (shouldDie) {
-            LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
-            shutdown();
-            return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
-                "Asked to die by the AM");
-          }
-        } catch (IOException e) {
-          handleError(e);
-          return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
-              e, "TaskExecutionFailure: " + e.getMessage());
-        } catch (TezException e) {
-          handleError(e);
-          return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
-              e, "TaskExecutionFailure: " + e.getMessage());
-        } finally {
-          FileSystem.closeAllForUGI(childUGI);
-        }
-      }
-    }
-    return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
-        null);
-  }
-
-  /**
-   * Setup
-   * 
-   * @param containerTask
-   *          the new task specification. Must be a valid task
-   * @param childUGI
-   *          the old UGI instance being used
-   * @return childUGI
-   */
-  UserGroupInformation handleNewTaskCredentials(ContainerTask containerTask,
-      UserGroupInformation childUGI) {
-    // Re-use the UGI only if the Credentials have not changed.
-    Preconditions.checkState(!containerTask.shouldDie());
-    Preconditions.checkState(containerTask.getTaskSpec() != null);
-    if (containerTask.haveCredentialsChanged()) {
-      LOG.info("Refreshing UGI since Credentials have changed");
-      Credentials taskCreds = containerTask.getCredentials();
-      if (taskCreds != null) {
-        LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
-            + taskCreds.numberOfSecretKeys());
-        childUGI = UserGroupInformation.createRemoteUser(System
-            .getenv(ApplicationConstants.Environment.USER.toString()));
-        childUGI.addCredentials(containerTask.getCredentials());
-      } else {
-        LOG.info("Not loading any credentials, since no credentials provided");
-      }
-    }
-    return childUGI;
-  }
-
-  /**
-   * Handles any additional resources to be localized for the new task
-   * 
-   * @param containerTask
-   * @throws IOException
-   * @throws TezException
-   */
-  private void handleNewTaskLocalResources(ContainerTask containerTask) throws IOException,
-      TezException {
-    Map<String, TezLocalResource> additionalResources = containerTask.getAdditionalResources();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Additional Resources added to container: " + additionalResources);
-    }
-
-    LOG.info("Localizing additional local resources for Task : " + additionalResources);
-    List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
-        Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
-          @Override
-          public URI apply(TezLocalResource input) {
-            return input.getUri();
-          }
-        }), defaultConf, workingDir);
-    RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
-
-    LOG.info("Done localizing additional resources");
-    final TaskSpec taskSpec = containerTask.getTaskSpec();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("New container task context:" + taskSpec.toString());
-    }
-  }
-
-  /**
-   * Cleans entries from the object registry, and resets the startedInputsMap if required
-   * 
-   * @param containerTask
-   *          the new task specification. Must be a valid task
-   */
-  private void cleanupOnTaskChanged(ContainerTask containerTask) {
-    Preconditions.checkState(!containerTask.shouldDie());
-    Preconditions.checkState(containerTask.getTaskSpec() != null);
-    TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getTaskID()
-        .getVertexID();
-    if (lastVertexID != null) {
-      if (!lastVertexID.equals(newVertexID)) {
-        objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.VERTEX);
-      }
-      if (!lastVertexID.getDAGId().equals(newVertexID.getDAGId())) {
-        objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.DAG);
-        startedInputsMap = HashMultimap.create();
-      }
-    }
-    lastVertexID = newVertexID;
-  }
-
-  private void shutdown() {
-    executor.shutdownNow();
-    if (taskReporter != null) {
-      taskReporter.shutdown();
-    }
-    RPC.stopProxy(umbilical);
-    DefaultMetricsSystem.shutdown();
-    if (!isLocal) {
-      LogManager.shutdown();
-    }
-  }
-
-  public void setUmbilical(TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol){
-    if(tezTaskUmbilicalProtocol != null){
-      this.umbilical = tezTaskUmbilicalProtocol;
-    }
-  }
-
-  public static class ContainerExecutionResult {
-    public static enum ExitStatus {
-      SUCCESS(0),
-      EXECUTION_FAILURE(1),
-      INTERRUPTED(2),
-      ASKED_TO_DIE(3);
-
-      private final int exitCode;
-
-      ExitStatus(int code) {
-        this.exitCode = code;
-      }
-
-      public int getExitCode() {
-        return this.exitCode;
-      }
-    }
-
-    private final ExitStatus exitStatus;
-    private final Throwable throwable;
-    private final String errorMessage;
-
-    ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable,
-                             @Nullable String errorMessage) {
-      this.exitStatus = exitStatus;
-      this.throwable = throwable;
-      this.errorMessage = errorMessage;
-    }
-
-    public ExitStatus getExitStatus() {
-      return this.exitStatus;
-    }
-
-    public Throwable getThrowable() {
-      return this.throwable;
-    }
-
-    public String getErrorMessage() {
-      return this.errorMessage;
-    }
-  }
-
-  public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
-      String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
-      Map<String, String> serviceProviderEnvMap, @Nullable String pid,
-      ExecutionContext ExecutionContext)
-      throws IOException, InterruptedException, TezException {
-
-    // Pull in configuration specified for the session.
-    // TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file
-    // for each and every task, and reading it back from disk. Also needs to be per vertex.
-    TezUtilsInternal.addUserSpecifiedTezConfiguration(workingDirectory, conf);
-    UserGroupInformation.setConfiguration(conf);
-    Limits.setConfiguration(conf);
-
-    // Should this be part of main - Metrics and ObjectRegistry. TezTask setup should be independent
-    // of this class. Leaving it here, till there's some entity representing a running JVM.
-    DefaultMetricsSystem.initialize("TezTask");
-
-    // singleton of ObjectRegistry for this JVM
-    ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
-
-    return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
-        attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid,
-        ExecutionContext);
-  }
-
-  public static void main(String[] args) throws IOException, InterruptedException, TezException {
-
-    final Configuration defaultConf = new Configuration();
-
-    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
-    LOG.info("TezChild starting");
-
-    assert args.length == 5;
-    String host = args[0];
-    int port = Integer.parseInt(args[1]);
-    final String containerIdentifier = args[2];
-    final String tokenIdentifier = args[3];
-    final int attemptNumber = Integer.parseInt(args[4]);
-    final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
-        .name()));
-    final String pid = System.getenv().get("JVM_PID");
-    LOG.info("PID, containerIdentifier:  " + pid + ", " + containerIdentifier);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port
-          + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber
-          + " tokenIdentifier: " + tokenIdentifier);
-    }
-    TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
-        tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
-        System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())));
-    tezChild.run();
-  }
-
-  private void handleError(Throwable t) {
-    shutdown();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
deleted file mode 100644
index 6e655f9..0000000
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.api.ExecutionContext;
-import org.apache.tez.runtime.api.ObjectRegistry;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezUmbilical;
-
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-public class TezTaskRunner implements TezUmbilical, ErrorReporter {
-
-  private static final Logger LOG = Logger.getLogger(TezTaskRunner.class);
-
-  private final Configuration tezConf;
-  private final LogicalIOProcessorRuntimeTask task;
-  private final UserGroupInformation ugi;
-
-  private final TaskReporter taskReporter;
-  private final ListeningExecutorService executor;
-  private volatile ListenableFuture<Void> taskFuture;
-  private volatile Thread waitingThread;
-  private volatile Throwable firstException;
-
-  // Effectively a duplicate check, since hadFatalError does the same thing.
-  private final AtomicBoolean fatalErrorSent = new AtomicBoolean(false);
-  private final AtomicBoolean taskRunning;
-  private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
-
-  TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
-      TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int appAttemptNumber,
-      Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
-      Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
-      ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid,
-      ExecutionContext ExecutionContext)
-          throws IOException {
-    this.tezConf = tezConf;
-    this.ugi = ugi;
-    this.taskReporter = taskReporter;
-    this.executor = executor;
-    task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this,
-        serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid,
-        ExecutionContext);
-    taskReporter.registerTask(task, this);
-    taskRunning = new AtomicBoolean(true);
-  }
-
-  /**
-   * @return false if a shutdown message was received during task execution
-   * @throws TezException
-   * @throws IOException
-   */
-  public boolean run() throws InterruptedException, IOException, TezException {
-    waitingThread = Thread.currentThread();
-    TaskRunnerCallable callable = new TaskRunnerCallable();
-    Throwable failureCause = null;
-    taskFuture = executor.submit(callable);
-    try {
-      taskFuture.get();
-
-      // Task could signal a fatal error and return control, or a failure while registering success.
-      failureCause = firstException;
-
-    } catch (InterruptedException e) {
-      LOG.info("Interrupted while waiting for task to complete. Interrupting task");
-      taskFuture.cancel(true);
-      if (shutdownRequested.get()) {
-        LOG.info("Shutdown requested... returning");
-        return false;
-      }
-      if (firstException != null) {
-        failureCause = firstException;
-      } else {
-        // Interrupted for some other reason.
-        failureCause = e;
-      }
-    } catch (ExecutionException e) {
-      // Exception thrown by the run() method itself.
-      Throwable cause = e.getCause();
-      if (cause instanceof FSError) {
-        // Not immediately fatal, this is an error reported by Hadoop FileSystem
-        failureCause = cause;
-      } else if (cause instanceof Error) {
-        LOG.error("Exception of type Error.", cause);
-        sendFailure(cause, "Fatal Error cause TezChild exit.");
-        throw new TezException("Fatal Error cause TezChild exit.", cause);
-      } else {
-        failureCause = cause;
-      }
-    } finally {
-      // Clear the interrupted status of the blocking thread, in case it is set after the
-      // InterruptedException was invoked.
-      taskReporter.unregisterTask(task.getTaskAttemptID());
-      Thread.interrupted();
-    }
-
-    if (failureCause != null) {
-      if (failureCause instanceof FSError) {
-        // Not immediately fatal, this is an error reported by Hadoop FileSystem
-        LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
-            failureCause);
-        throw (FSError) failureCause;
-      } else if (failureCause instanceof Error) {
-        LOG.error("Exception of type Error.", failureCause);
-        sendFailure(failureCause, "Fatal error cause TezChild exit.");
-        throw new TezException("Fatal error cause TezChild exit.", failureCause);
-      } else {
-        if (failureCause instanceof IOException) {
-          throw (IOException) failureCause;
-        } else if (failureCause instanceof TezException) {
-          throw (TezException) failureCause;
-        } else if (failureCause instanceof InterruptedException) {
-          throw (InterruptedException) failureCause;
-        } else {
-          throw new TezException(failureCause);
-        }
-      }
-    }
-    if (shutdownRequested.get()) {
-      LOG.info("Shutdown requested... returning");
-      return false;
-    }
-    return true;
-  }
-
-  private class TaskRunnerCallable implements Callable<Void> {
-
-    @Override
-    public Void call() throws Exception {
-      try {
-        return ugi.doAs(new PrivilegedExceptionAction<Void>() {
-          @Override
-          public Void run() throws Exception {
-            try {
-              LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
-              task.initialize();
-              if (!Thread.currentThread().isInterrupted() && firstException == null) {
-                LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
-                task.run();
-                LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
-                task.close();
-                task.setFrameworkCounters();
-              }
-              LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID()
-                  + ", fatalErrorOccurred=" + (firstException != null));
-              if (firstException == null) {
-                try {
-                  taskReporter.taskSucceeded(task.getTaskAttemptID());
-                } catch (IOException e) {
-                  LOG.warn("Heartbeat failure caused by communication failure", e);
-                  maybeRegisterFirstException(e);
-                  // Falling off, since the runner thread checks for the registered exception.
-                } catch (TezException e) {
-                  LOG.warn("Heartbeat failure reported by AM", e);
-                  maybeRegisterFirstException(e);
-                  // Falling off, since the runner thread checks for the registered exception.
-                }
-              }
-              return null;
-            } catch (Throwable cause) {
-              if (cause instanceof FSError) {
-                // Not immediately fatal, this is an error reported by Hadoop FileSystem
-                maybeRegisterFirstException(cause);
-                LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
-                    cause);
-                try {
-                  sendFailure(cause, "FS Error in Child JVM");
-                } catch (Exception ignored) {
-                  // Ignored since another cause is already known
-                  LOG.info(
-                      "Ignoring the following exception since a previous exception is already registered",
-                      ignored);
-                }
-                throw (FSError) cause;
-              } else if (cause instanceof Error) {
-                LOG.error("Exception of type Error.", cause);
-                sendFailure(cause, "Fatal Error cause TezChild exit.");
-                throw new TezException("Fatal Error cause TezChild exit.", cause);
-              } else {
-                if (cause instanceof UndeclaredThrowableException) {
-                  cause = ((UndeclaredThrowableException) cause).getCause();
-                }
-                maybeRegisterFirstException(cause);
-                LOG.info("Encounted an error while executing task: " + task.getTaskAttemptID(),
-                    cause);
-                try {
-                  sendFailure(cause, "Failure while running task");
-                } catch (Exception ignored) {
-                  // Ignored since another cause is already known
-                  LOG.info(
-                      "Ignoring the following exception since a previous exception is already registered",
-                      ignored);
-                }
-                if (cause instanceof IOException) {
-                  throw (IOException) cause;
-                } else if (cause instanceof TezException) {
-                  throw (TezException) cause;
-                } else {
-                  throw new TezException(cause);
-                }
-              }
-            } finally {
-              task.cleanup();
-            }
-          }
-        });
-      } finally {
-        taskRunning.set(false);
-      }
-    }
-  }
-
-  // should wait until all messages are sent to AM before TezChild shutdown
-  // if this method become async in future
-  private void sendFailure(Throwable t, String message) throws IOException, TezException {
-    if (!fatalErrorSent.getAndSet(true)) {
-      task.setFatalError(t, message);
-      task.setFrameworkCounters();
-      try {
-        taskReporter.taskFailed(task.getTaskAttemptID(), t, message, null);
-      } catch (IOException e) {
-        // A failure reason already exists, Comm error just logged.
-        LOG.warn("Heartbeat failure caused by communication failure", e);
-        throw e;
-      } catch (TezException e) {
-        // A failure reason already exists, Comm error just logged.
-        LOG.warn("Heartbeat failure reported by AM", e);
-        throw e;
-      }
-    } else {
-      LOG.warn("Ignoring fatal error since another error has already been reported", t);
-    }
-  }
-
-  @Override
-  public void addEvents(Collection<TezEvent> events) {
-    if (taskRunning.get()) {
-      taskReporter.addEvents(task.getTaskAttemptID(), events);
-    }
-  }
-
-  @Override
-  public synchronized void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t,
-      String message, EventMetaData sourceInfo) {
-    // This can be called before a task throws an exception or after it.
-    // If called before a task throws an exception
-    // - ensure a heartbeat is sent with the diagnostics, and sent only once.
-    // - interrupt the waiting thread, and make it throw the reported error.
-    // If called after a task throws an exception, the waiting task has already returned, no point
-    // interrupting it.
-    // This case can be effectively ignored (log), as long as the run() method ends up throwing the
-    // exception.
-    //
-    //
-    if (!fatalErrorSent.getAndSet(true)) {
-      maybeRegisterFirstException(t);
-      try {
-        taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo);
-      } catch (IOException e) {
-        // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
-        // occurred earlier.
-        LOG.warn("Heartbeat failure caused by communication failure", e);
-      } catch (TezException e) {
-        // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
-        // occurred earlier.
-        LOG.warn("Heartbeat failure reported by AM", e);
-      } finally {
-        // Wake up the waiting thread so that it can return control
-        waitingThread.interrupt();
-      }
-    }
-  }
-
-  @Override
-  public boolean canCommit(TezTaskAttemptID taskAttemptID) {
-    if (taskRunning.get()) {
-      try {
-        return taskReporter.canCommit(taskAttemptID);
-      } catch (IOException e) {
-        LOG.warn("Communication failure while trying to commit", e);
-        maybeRegisterFirstException(e);
-        waitingThread.interrupt();
-        // Not informing the task since it will be interrupted.
-        // TODO: Should this be sent to the task as well, current Processors, etc do not handle
-        // interrupts very well.
-        return false;
-      }
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public synchronized void reportError(Throwable t) {
-   if (taskRunning.get()) {
-      LOG.error("TaskReporter reported error", t);
-      maybeRegisterFirstException(t);
-      waitingThread.interrupt();
-      // A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
-      // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
-      // method does not throw an exception, in which case task success is registered with the AM.
-      // Leave this handling to the next getTask / actual task.
-    } else {
-      LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
-          + " is already complete");
-    }
-  }
-
-  @Override
-  public void shutdownRequested() {
-    shutdownRequested.set(true);
-    waitingThread.interrupt();
-  }
-
-  private String getTaskDiagnosticsString(Throwable t, String message) {
-    String diagnostics;
-    if (t != null && message != null) {
-      diagnostics = "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + ", errorMessage="
-          + message;
-    } else if (t == null && message == null) {
-      diagnostics = "Unknown error";
-    } else {
-      diagnostics = t != null ? "exceptionThrown=" + ExceptionUtils.getStackTrace(t)
-          : " errorMessage=" + message;
-    }
-    return diagnostics;
-  }
-
-  private synchronized void maybeRegisterFirstException(Throwable t) {
-    if (firstException == null) {
-      firstException = t;
-    }
-  }
-
-}


[2/3] tez git commit: TEZ-1933. Move TezChild and related classes into tez-runtime-internals. (sseth)

Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
deleted file mode 100644
index b18324a..0000000
--- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.log4j.Logger;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.api.impl.InputSpec;
-import org.apache.tez.runtime.api.impl.OutputSpec;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.apache.tez.runtime.library.processor.SimpleProcessor;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-// Tests in this class cannot be run in parallel.
-public class TestTaskExecution {
-
-  private static final Logger LOG = Logger.getLogger(TestTaskExecution.class);
-
-  private static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException";
-
-  private static final Configuration defaultConf = new Configuration();
-  private static final FileSystem localFs;
-  private static final Path workDir;
-
-  private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(1);
-
-  static {
-    defaultConf.set("fs.defaultFS", "file:///");
-    try {
-      localFs = FileSystem.getLocal(defaultConf);
-      Path wd = new Path(System.getProperty("test.build.data", "/tmp"),
-          TestTaskExecution.class.getSimpleName());
-      workDir = localFs.makeQualified(wd);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Before
-  public void reset() {
-    TestProcessor.reset();
-  }
-
-  @AfterClass
-  public static void shutdown() {
-    taskExecutor.shutdownNow();
-  }
-
-  @Test(timeout = 5000)
-  public void testSingleSuccessfulTask() throws IOException, InterruptedException, TezException,
-      ExecutionException {
-    ListeningExecutorService executor = null;
-    try {
-      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
-      executor = MoreExecutors.listeningDecorator(rawExecutor);
-      ApplicationId appId = ApplicationId.newInstance(10000, 1);
-      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
-      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
-      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
-          TestProcessor.CONF_EMPTY);
-      // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
-      // Signal the processor to go through
-      TestProcessor.signal();
-      boolean result = taskRunnerFuture.get();
-      assertTrue(result);
-      assertNull(taskReporter.currentCallable);
-      umbilical.verifyTaskSuccessEvent();
-    } finally {
-      executor.shutdownNow();
-    }
-  }
-
-  @Test(timeout = 5000)
-  public void testMultipleSuccessfulTasks() throws IOException, InterruptedException, TezException,
-      ExecutionException {
-
-    ListeningExecutorService executor = null;
-    try {
-      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
-      executor = MoreExecutors.listeningDecorator(rawExecutor);
-      ApplicationId appId = ApplicationId.newInstance(10000, 1);
-      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
-      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
-      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
-          TestProcessor.CONF_EMPTY);
-      // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
-      // Signal the processor to go through
-      TestProcessor.signal();
-      boolean result = taskRunnerFuture.get();
-      assertTrue(result);
-      assertNull(taskReporter.currentCallable);
-      umbilical.verifyTaskSuccessEvent();
-      umbilical.resetTrackedEvents();
-
-      taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
-          TestProcessor.CONF_EMPTY);
-      // Setup the executor
-      taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
-      // Signal the processor to go through
-      TestProcessor.signal();
-      result = taskRunnerFuture.get();
-      assertTrue(result);
-      assertNull(taskReporter.currentCallable);
-      umbilical.verifyTaskSuccessEvent();
-    } finally {
-      executor.shutdownNow();
-    }
-  }
-
-  // test tasked failed due to exception in Processor
-  @Test(timeout = 5000)
-  public void testFailedTask() throws IOException, InterruptedException, TezException {
-
-    ListeningExecutorService executor = null;
-    try {
-      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
-      executor = MoreExecutors.listeningDecorator(rawExecutor);
-      ApplicationId appId = ApplicationId.newInstance(10000, 1);
-      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
-      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
-      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
-          TestProcessor.CONF_THROW_TEZ_EXCEPTION);
-      // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
-      // Signal the processor to go through
-      TestProcessor.awaitStart();
-      TestProcessor.signal();
-      try {
-        taskRunnerFuture.get();
-        fail("Expecting the task to fail");
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        LOG.info(cause.getClass().getName());
-        assertTrue(cause instanceof TezException);
-      }
-
-      assertNull(taskReporter.currentCallable);
-      umbilical.verifyTaskFailedEvent("Failure while running task:org.apache.tez.dag.api.TezException: TezException");
-    } finally {
-      executor.shutdownNow();
-    }
-  }
-
-  // Test task failed due to Processor class not found
-  @Test(timeout = 5000)
-  public void testFailedTask2() throws IOException, InterruptedException, TezException {
-
-    ListeningExecutorService executor = null;
-    try {
-      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
-      executor = MoreExecutors.listeningDecorator(rawExecutor);
-      ApplicationId appId = ApplicationId.newInstance(10000, 1);
-      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
-      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
-      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
-          "NotExitedProcessor", TestProcessor.CONF_THROW_TEZ_EXCEPTION);
-      // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
-      try {
-        taskRunnerFuture.get();
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        LOG.info(cause.getClass().getName());
-        assertTrue(cause instanceof TezException);
-      }
-      assertNull(taskReporter.currentCallable);
-      umbilical.verifyTaskFailedEvent("Failure while running task:org.apache.tez.dag.api.TezUncheckedException: "
-            + "Unable to load class: NotExitedProcessor");
-    } finally {
-      executor.shutdownNow();
-    }
-  }
-
-  @Test(timeout = 5000)
-  public void testHeartbeatException() throws IOException, InterruptedException, TezException {
-
-    ListeningExecutorService executor = null;
-    try {
-      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
-      executor = MoreExecutors.listeningDecorator(rawExecutor);
-      ApplicationId appId = ApplicationId.newInstance(10000, 1);
-      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
-      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
-      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
-          TestProcessor.CONF_EMPTY);
-      // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
-      // Signal the processor to go through
-      TestProcessor.awaitStart();
-      umbilical.signalThrowException();
-      umbilical.awaitRegisteredEvent();
-      // Not signaling an actual start to verify task interruption
-      try {
-        taskRunnerFuture.get();
-        fail("Expecting the task to fail");
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        assertTrue(cause instanceof IOException);
-        assertTrue(cause.getMessage().contains(HEARTBEAT_EXCEPTION_STRING));
-      }
-      TestProcessor.awaitCompletion();
-      assertTrue(TestProcessor.wasInterrupted());
-      assertNull(taskReporter.currentCallable);
-      // No completion events since umbilical communication already failed.
-      umbilical.verifyNoCompletionEvents();
-    } finally {
-      executor.shutdownNow();
-    }
-  }
-
-  @Test(timeout = 5000)
-  public void testHeartbeatShouldDie() throws IOException, InterruptedException, TezException,
-      ExecutionException {
-
-    ListeningExecutorService executor = null;
-    try {
-      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
-      executor = MoreExecutors.listeningDecorator(rawExecutor);
-      ApplicationId appId = ApplicationId.newInstance(10000, 1);
-      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
-      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
-
-      TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
-          TestProcessor.CONF_EMPTY);
-      // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
-      // Signal the processor to go through
-      TestProcessor.awaitStart();
-      umbilical.signalSendShouldDie();
-      umbilical.awaitRegisteredEvent();
-      // Not signaling an actual start to verify task interruption
-
-      boolean result = taskRunnerFuture.get();
-      assertFalse(result);
-
-      TestProcessor.awaitCompletion();
-      assertTrue(TestProcessor.wasInterrupted());
-      assertNull(taskReporter.currentCallable);
-      // TODO Is this statement correct ?
-      // No completion events since shouldDie was requested by the AM, which should have killed the
-      // task.
-      umbilical.verifyNoCompletionEvents();
-    } finally {
-      executor.shutdownNow();
-    }
-  }
-
-  @Test(timeout = 5000)
-  public void testGetTaskShouldDie() throws InterruptedException, ExecutionException {
-    ListeningExecutorService executor = null;
-    try {
-      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
-      executor = MoreExecutors.listeningDecorator(rawExecutor);
-      ApplicationId appId = ApplicationId.newInstance(10000, 1);
-      ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-      ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
-
-      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
-      ContainerContext containerContext = new ContainerContext(containerId.toString());
-
-      ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100);
-      ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
-
-      getTaskFuture.get();
-      assertEquals(1, umbilical.getTaskInvocations);
-
-    } finally {
-      executor.shutdownNow();
-    }
-  }
-
-  // Potential new tests
-  // Different states - initialization failure, close failure
-  // getTask states
-
-  private static class TaskRunnerCallable implements Callable<Boolean> {
-    private final TezTaskRunner taskRunner;
-
-    public TaskRunnerCallable(TezTaskRunner taskRunner) {
-      this.taskRunner = taskRunner;
-    }
-
-    @Override
-    public Boolean call() throws Exception {
-      return taskRunner.run();
-    }
-  }
-
-  // Uses static fields for signaling. Ensure only used by one test at a time.
-  public static class TestProcessor extends SimpleProcessor {
-
-    public static final byte[] CONF_EMPTY = new byte[] { 0 };
-    public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 };
-    public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 };
-    public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 };
-    public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 };
-    public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 };
-
-    private static final Logger LOG = Logger.getLogger(TestProcessor.class);
-
-    private static final ReentrantLock processorLock = new ReentrantLock();
-    private static final Condition processorCondition = processorLock.newCondition();
-    private static final Condition completionCondition = processorLock.newCondition();
-    private static final Condition runningCondition = processorLock.newCondition();
-    private static boolean completed = false;
-    private static boolean running = false;
-    private static boolean signalled = false;
-
-    public static boolean receivedInterrupt = false;
-
-    private boolean throwIOException = false;
-    private boolean throwTezException = false;
-    private boolean signalFatalAndThrow = false;
-    private boolean signalFatalAndLoop = false;
-    private boolean signalFatalAndComplete = false;
-
-    public TestProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void initialize() throws Exception {
-      parseConf(getContext().getUserPayload().deepCopyAsArray());
-    }
-
-    private void parseConf(byte[] bytes) {
-      byte b = bytes[0];
-      throwIOException = (b & 1) > 1;
-      throwTezException = (b & 2) > 1;
-      signalFatalAndThrow = (b & 4) > 1;
-      signalFatalAndLoop = (b & 8) > 1;
-      signalFatalAndComplete = (b & 16) > 1;
-    }
-
-    public static void reset() {
-      signalled = false;
-      receivedInterrupt = false;
-      completed = false;
-      running = false;
-    }
-
-    public static void signal() {
-      LOG.info("Signalled");
-      processorLock.lock();
-      try {
-        signalled = true;
-        processorCondition.signal();
-      } finally {
-        processorLock.unlock();
-      }
-    }
-
-    public static void awaitStart() throws InterruptedException {
-      LOG.info("Awaiting Process run");
-      processorLock.lock();
-      try {
-        if (running) {
-          return;
-        }
-        runningCondition.await();
-      } finally {
-        processorLock.unlock();
-      }
-    }
-
-    public static void awaitCompletion() throws InterruptedException {
-      LOG.info("Await completion");
-      processorLock.lock();
-      try {
-        if (completed) {
-          return;
-        } else {
-          completionCondition.await();
-        }
-      } finally {
-        processorLock.unlock();
-      }
-    }
-
-    public static boolean wasInterrupted() {
-      processorLock.lock();
-      try {
-        return receivedInterrupt;
-      } finally {
-        processorLock.unlock();
-      }
-    }
-
-    @Override
-    public void run() throws Exception {
-      processorLock.lock();
-      running = true;
-      runningCondition.signal();
-      try {
-        try {
-          LOG.info("Signal is: " + signalled);
-          if (!signalled) {
-            LOG.info("Waiting for processor signal");
-            processorCondition.await();
-          }
-          if (Thread.currentThread().isInterrupted()) {
-            throw new InterruptedException();
-          }
-          LOG.info("Received processor signal");
-          if (throwIOException) {
-            throw new IOException();
-          } else if (throwTezException) {
-            throw new TezException("TezException");
-          } else if (signalFatalAndThrow) {
-            IOException io = new IOException("FATALERROR");
-            getContext().fatalError(io, "FATALERROR");
-            throw io;
-          } else if (signalFatalAndComplete) {
-            IOException io = new IOException("FATALERROR");
-            getContext().fatalError(io, "FATALERROR");
-            return;
-          } else if (signalFatalAndLoop) {
-            IOException io = new IOException("FATALERROR");
-            getContext().fatalError(io, "FATALERROR");
-            LOG.info("Waiting for Processor signal again");
-            processorCondition.await();
-            LOG.info("Received second processor signal");
-          }
-        } catch (InterruptedException e) {
-          receivedInterrupt = true;
-        }
-      } finally {
-        completed = true;
-        completionCondition.signal();
-        processorLock.unlock();
-      }
-    }
-  }
-
-  private static class TezTaskUmbilicalForTest implements TezTaskUmbilicalProtocol {
-
-    private static final Logger LOG = Logger.getLogger(TezTaskUmbilicalForTest.class);
-
-    private final List<TezEvent> requestEvents = new LinkedList<TezEvent>();
-
-    private final ReentrantLock umbilicalLock = new ReentrantLock();
-    private final Condition eventCondition = umbilicalLock.newCondition();
-    private boolean pendingEvent = false;
-    private boolean eventEnacted = false;
-
-    volatile int getTaskInvocations = 0;
-
-    private boolean shouldThrowException = false;
-    private boolean shouldSendDieSignal = false;
-
-    public void signalThrowException() {
-      umbilicalLock.lock();
-      try {
-        shouldThrowException = true;
-        pendingEvent = true;
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void signalSendShouldDie() {
-      umbilicalLock.lock();
-      try {
-        shouldSendDieSignal = true;
-        pendingEvent = true;
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void awaitRegisteredEvent() throws InterruptedException {
-      umbilicalLock.lock();
-      try {
-        if (eventEnacted) {
-          return;
-        }
-        LOG.info("Awaiting event");
-        eventCondition.await();
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void resetTrackedEvents() {
-      umbilicalLock.lock();
-      try {
-        requestEvents.clear();
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void verifyNoCompletionEvents() {
-      umbilicalLock.lock();
-      try {
-        for (TezEvent event : requestEvents) {
-          if (event.getEvent() instanceof TaskAttemptFailedEvent) {
-            fail("Found a TaskAttemptFailedEvent when not expected");
-          }
-          if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
-            fail("Found a TaskAttemptCompletedvent when not expected");
-          }
-        }
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void verifyTaskFailedEvent(String diagnostics) {
-      umbilicalLock.lock();
-      try {
-        for (TezEvent event : requestEvents) {
-          if (event.getEvent() instanceof TaskAttemptFailedEvent) {
-            TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent)event.getEvent();
-            if(failedEvent.getDiagnostics().startsWith(diagnostics)){
-              return ;
-            } else {
-              fail("No detailed diagnostics message in TaskAttemptFailedEvent");
-            }
-          }
-        }
-        fail("No TaskAttemptFailedEvents sent over umbilical");
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void verifyTaskSuccessEvent() {
-      umbilicalLock.lock();
-      try {
-        for (TezEvent event : requestEvents) {
-          if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
-            return;
-          }
-        }
-        fail("No TaskAttemptFailedEvents sent over umbilical");
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    @Override
-    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
-      return 0;
-    }
-
-    @Override
-    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
-        int clientMethodsHash) throws IOException {
-      return null;
-    }
-
-    @Override
-    public ContainerTask getTask(ContainerContext containerContext) throws IOException {
-      // Return shouldDie = true
-      getTaskInvocations++;
-      return new ContainerTask(null, true, null, null, false);
-    }
-
-    @Override
-    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
-      return true;
-    }
-
-    @Override
-    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
-        TezException {
-      umbilicalLock.lock();
-      if (request.getEvents() != null) {
-        requestEvents.addAll(request.getEvents());
-      }
-      try {
-        if (shouldThrowException) {
-          LOG.info("TestUmbilical throwing Exception");
-          throw new IOException(HEARTBEAT_EXCEPTION_STRING);
-        }
-        TezHeartbeatResponse response = new TezHeartbeatResponse();
-        response.setLastRequestId(request.getRequestId());
-        if (shouldSendDieSignal) {
-          LOG.info("TestUmbilical returning shouldDie=true");
-          response.setShouldDie();
-        }
-        return response;
-      } finally {
-        if (pendingEvent) {
-          eventEnacted = true;
-          LOG.info("Signalling Event");
-          eventCondition.signal();
-        }
-        umbilicalLock.unlock();
-      }
-    }
-  }
-
-  private TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) {
-    TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
-        createContainerId(appId).toString());
-    return taskReporter;
-  }
-
-  private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical,
-      TaskReporter taskReporter, ListeningExecutorService executor, byte[] processorConf)
-      throws IOException {
-    return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(),
-        processorConf);
-  }
-
-  private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical,
-      TaskReporter taskReporter, ListeningExecutorService executor, String processorClass, byte[] processorConf) throws IOException{
-    TezConfiguration tezConf = new TezConfiguration(defaultConf);
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    Path testDir = new Path(workDir, UUID.randomUUID().toString());
-    String[] localDirs = new String[] { testDir.toString() };
-
-    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
-    TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
-    TezTaskID taskId = TezTaskID.getInstance(vertexId, 1);
-    TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
-    ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create(processorClass)
-        .setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf)));
-    TaskSpec taskSpec = new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
-        new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
-
-    TezTaskRunner taskRunner = new TezTaskRunner(tezConf, ugi, localDirs, taskSpec, umbilical, 1,
-        new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String> create(), taskReporter,
-        executor, null, "", new ExecutionContextImpl("localhost"));
-    return taskRunner;
-  }
-
-  private ContainerId createContainerId(ApplicationId appId) {
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-    ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
-    return containerId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
deleted file mode 100644
index de03307..0000000
--- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.task;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.collect.Lists;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class TestTaskReporter {
-
-  @Test(timeout = 10000)
-  public void testContinuousHeartbeatsOnMaxEvents() throws Exception {
-
-    final Object lock = new Object();
-    final AtomicBoolean hb2Done = new AtomicBoolean(false);
-
-    TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
-    doAnswer(new Answer() {
-      @Override
-      public Object answer(InvocationOnMock invocation) throws Throwable {
-        Object[] args = invocation.getArguments();
-        TezHeartbeatRequest request = (TezHeartbeatRequest) args[0];
-        if (request.getRequestId() == 1 || request.getRequestId() == 2) {
-          TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(5));
-          response.setLastRequestId(request.getRequestId());
-          return response;
-        } else if (request.getRequestId() == 3) {
-          TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(1));
-          response.setLastRequestId(request.getRequestId());
-          synchronized (lock) {
-            hb2Done.set(true);
-            lock.notify();
-          }
-          return response;
-        } else {
-          throw new TezUncheckedException("Invalid request id for test: " + request.getRequestId());
-        }
-      }
-    }).when(mockUmbilical).heartbeat(any(TezHeartbeatRequest.class));
-
-    TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);
-    LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
-    doReturn("vertexName").when(mockTask).getVertexName();
-    doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID();
-
-    // Setup the sleep time to be way higher than the test timeout
-    TaskReporter.HeartbeatCallable heartbeatCallable =
-        new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5,
-            new AtomicLong(0),
-            "containerIdStr");
-
-    ExecutorService executor = Executors.newSingleThreadExecutor();
-    executor.submit(heartbeatCallable);
-    try {
-      synchronized (lock) {
-        if (!hb2Done.get()) {
-          lock.wait();
-        }
-      }
-      verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
-      Thread.sleep(2000l);
-      // Sleep for 2 seconds, less than the callable sleep time. No more invocations.
-      verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
-    } finally {
-      executor.shutdownNow();
-    }
-
-  }
-
-  private List<TezEvent> createEvents(int numEvents) {
-    List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);
-    for (int i = 0; i < numEvents; i++) {
-      list.add(mock(TezEvent.class));
-    }
-    return list;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
new file mode 100644
index 0000000..a68c7c1
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+
+/**
+ * Responsible for communication between a running Container and the ApplicationMaster. The main
+ * functionality is to poll for new tasks.
+ * 
+ */
+public class ContainerReporter implements Callable<ContainerTask> {
+
+  private static final Logger LOG = Logger.getLogger(ContainerReporter.class);
+
+  private final TezTaskUmbilicalProtocol umbilical;
+  private final ContainerContext containerContext;
+  private final int getTaskMaxSleepTime;
+  private final long LOG_INTERVAL = 2000l;
+
+  private long nextGetTaskPrintTime;
+
+  ContainerReporter(TezTaskUmbilicalProtocol umbilical, ContainerContext containerContext,
+      int getTaskMaxSleepTime) {
+    this.umbilical = umbilical;
+    this.containerContext = containerContext;
+    this.getTaskMaxSleepTime = getTaskMaxSleepTime;
+  }
+
+  @Override
+  public ContainerTask call() throws Exception {
+    ContainerTask containerTask = null;
+    LOG.info("Attempting to fetch new task");
+    containerTask = umbilical.getTask(containerContext);
+    long getTaskPollStartTime = System.currentTimeMillis();
+    nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL;
+    for (int idle = 1; containerTask == null; idle++) {
+      long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
+      maybeLogSleepMessage(sleepTimeMilliSecs);
+      TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs);
+      containerTask = umbilical.getTask(containerContext);
+    }
+    LOG.info("Got TaskUpdate: "
+        + (System.currentTimeMillis() - getTaskPollStartTime)
+        + " ms after starting to poll."
+        + " TaskInfo: shouldDie: "
+        + containerTask.shouldDie()
+        + (containerTask.shouldDie() == true ? "" : ", currentTaskAttemptId: "
+            + containerTask.getTaskSpec().getTaskAttemptID()));
+    return containerTask;
+  }
+
+  private void maybeLogSleepMessage(long sleepTimeMilliSecs) {
+    long currentTime = System.currentTimeMillis();
+    if (sleepTimeMilliSecs + currentTime > nextGetTaskPrintTime) {
+      LOG.info("Sleeping for " + sleepTimeMilliSecs
+          + "ms before retrying getTask again. Got null now. "
+          + "Next getTask sleep message after " + LOG_INTERVAL + "ms");
+      nextGetTaskPrintTime = currentTime + sleepTimeMilliSecs + LOG_INTERVAL;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
new file mode 100644
index 0000000..1146ce4
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+public interface ErrorReporter {
+
+  void reportError(Throwable t);
+  
+  void shutdownRequested();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
new file mode 100644
index 0000000..15dcbb0
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Responsible for communication between tasks running in a Container and the ApplicationMaster.
+ * Takes care of sending heartbeats (regular and OOB) to the AM - to send generated events, and to
+ * retrieve events specific to this task.
+ * 
+ */
+public class TaskReporter {
+
+  private static final Logger LOG = Logger.getLogger(TaskReporter.class);
+
+  private final TezTaskUmbilicalProtocol umbilical;
+  private final long pollInterval;
+  private final long sendCounterInterval;
+  private final int maxEventsToGet;
+  private final AtomicLong requestCounter;
+  private final String containerIdStr;
+
+  private final ListeningExecutorService heartbeatExecutor;
+
+  @VisibleForTesting
+  HeartbeatCallable currentCallable;
+
+  public TaskReporter(TezTaskUmbilicalProtocol umbilical, long amPollInterval,
+      long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
+    this.umbilical = umbilical;
+    this.pollInterval = amPollInterval;
+    this.sendCounterInterval = sendCounterInterval;
+    this.maxEventsToGet = maxEventsToGet;
+    this.requestCounter = requestCounter;
+    this.containerIdStr = containerIdStr;
+    ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
+    heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
+  }
+
+  /**
+   * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc.
+   */
+  public synchronized void registerTask(LogicalIOProcessorRuntimeTask task,
+      ErrorReporter errorReporter) {
+    currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
+        maxEventsToGet, requestCounter, containerIdStr);
+    ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
+    Futures.addCallback(future, new HeartbeatCallback(errorReporter));
+  }
+
+  /**
+   * This method should always be invoked before setting up heartbeats for another task running in
+   * the same container.
+   */
+  public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
+    currentCallable.markComplete();
+    currentCallable = null;
+  }
+  
+  public void shutdown() {
+    heartbeatExecutor.shutdownNow();
+  }
+
+  @VisibleForTesting
+  static class HeartbeatCallable implements Callable<Boolean> {
+
+    private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
+    private static final float LOG_COUNTER_BACKOFF = 1.3f;
+
+    private final LogicalIOProcessorRuntimeTask task;
+    private EventMetaData updateEventMetadata;
+
+    private final TezTaskUmbilicalProtocol umbilical;
+
+    private final long pollInterval;
+    private final long sendCounterInterval;
+    private final int maxEventsToGet;
+    private final String containerIdStr;
+
+    private final AtomicLong requestCounter;
+
+    private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>();
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition condition = lock.newCondition();
+
+    /*
+     * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
+     * log counters.
+     */
+    private int nonOobHeartbeatCounter = 0;
+    private int nextHeartbeatNumToLog = 0;
+    /*
+     * Tracks the last non-OOB heartbeat number at which counters were sent to the AM. 
+     */
+    private int prevCounterSendHeartbeatNum = 0;
+
+    public HeartbeatCallable(LogicalIOProcessorRuntimeTask task,
+        TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval,
+        int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
+
+      this.pollInterval = amPollInterval;
+      this.sendCounterInterval = sendCounterInterval;
+      this.maxEventsToGet = maxEventsToGet;
+      this.requestCounter = requestCounter;
+      this.containerIdStr = containerIdStr;
+
+      this.task = task;
+      this.umbilical = umbilical;
+      this.updateEventMetadata = new EventMetaData(EventProducerConsumerType.SYSTEM,
+          task.getVertexName(), "", task.getTaskAttemptID());
+
+      nextHeartbeatNumToLog = (Math.max(1,
+          (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
+              : (float) amPollInterval))));
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      // Heartbeat only for active tasks. Errors, etc will be reported directly.
+      while (!task.isTaskDone() && !task.hadFatalError()) {
+        ResponseWrapper response = heartbeat(null);
+
+        if (response.shouldDie) {
+          // AM sent a shouldDie=true
+          LOG.info("Asked to die via task heartbeat");
+          return false;
+        } else {
+          if (response.numEvents < maxEventsToGet) {
+            // Wait before sending another heartbeat. Otherwise consider as an OOB heartbeat
+            lock.lock();
+            try {
+              boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
+              if (!interrupted) {
+                nonOobHeartbeatCounter++;
+              }
+            } finally {
+              lock.unlock();
+            }
+          }
+        }
+      }
+      int pendingEventCount = eventsToSend.size();
+      if (pendingEventCount > 0) {
+        LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
+      }
+      return true;
+    }
+
+    /**
+     * @param eventsArg
+     * @return
+     * @throws IOException
+     *           indicates an RPC communication failure.
+     * @throws TezException
+     *           indicates an exception somewhere in the AM.
+     */
+    private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) throws IOException,
+        TezException {
+
+      if (eventsArg != null) {
+        eventsToSend.addAll(eventsArg);
+      }
+
+      TezEvent updateEvent = null;
+      List<TezEvent> events = new ArrayList<TezEvent>();
+      eventsToSend.drainTo(events);
+
+      if (!task.isTaskDone() && !task.hadFatalError()) {
+        TezCounters counters = null;
+        /**
+         * Increasing the heartbeat interval can delay the delivery of events. Sending just updated
+         * records would save CPU in DAG AM, but certain counters are updated very frequently. Until
+         * real time decisions are made based on these counters, it can be sent once per second.
+         */
+        // Not completely accurate, since OOB heartbeats could go out.
+        if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
+          counters = task.getCounters();
+          prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
+        }
+        updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
+            updateEventMetadata);
+        events.add(updateEvent);
+      }
+
+      long requestId = requestCounter.incrementAndGet();
+      TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
+          task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending heartbeat to AM, request=" + request);
+      }
+
+      maybeLogCounters();
+
+      TezHeartbeatResponse response = umbilical.heartbeat(request);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received heartbeat response from AM, response=" + response);
+      }
+
+      if (response.shouldDie()) {
+        LOG.info("Received should die response from AM");
+        return new ResponseWrapper(true, 1);
+      }
+      if (response.getLastRequestId() != requestId) {
+        throw new TezException("AM and Task out of sync" + ", responseReqId="
+            + response.getLastRequestId() + ", expectedReqId=" + requestId);
+      }
+
+      // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
+      // are running using the same umbilical.
+      int numEventsReceived = 0;
+      if (task.isTaskDone() || task.hadFatalError()) {
+        if (response.getEvents() != null && !response.getEvents().isEmpty()) {
+          LOG.warn("Current task already complete, Ignoring all event in"
+              + " heartbeat response, eventCount=" + response.getEvents().size());
+        }
+      } else {
+        if (response.getEvents() != null && !response.getEvents().isEmpty()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
+                + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
+          }
+          // This should ideally happen in a separate thread
+          numEventsReceived = response.getEvents().size();
+          task.handleEvents(response.getEvents());
+        }
+      }
+      return new ResponseWrapper(false, numEventsReceived);
+    }
+
+    public void markComplete() {
+      // Notify to clear pending events, if any.
+      lock.lock();
+      try {
+        condition.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    private void maybeLogCounters() {
+      if (LOG.isDebugEnabled()) {
+        if (nonOobHeartbeatCounter == nextHeartbeatNumToLog) {
+          LOG.debug("Counters: " + task.getCounters().toShortString());
+          nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
+        }
+      }
+    }
+
+    /**
+     * Sends out final events for task success.
+     * @param taskAttemptID
+     * @return
+     * @throws IOException
+     *           indicates an RPC communication failure.
+     * @throws TezException
+     *           indicates an exception somewhere in the AM.
+     */
+    private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
+      TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
+          task.getProgress()), updateEventMetadata);
+      TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
+          updateEventMetadata);
+      return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
+    }
+
+    /**
+     * Sends out final events for task failure.
+     * @param taskAttemptID
+     * @param t
+     * @param diagnostics
+     * @param srcMeta
+     * @return
+     * @throws IOException
+     *           indicates an RPC communication failure.
+     * @throws TezException
+     *           indicates an exception somewhere in the AM.
+     */
+    private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+        EventMetaData srcMeta) throws IOException, TezException {
+      TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
+          task.getProgress()), updateEventMetadata);
+      if (diagnostics == null) {
+        diagnostics = ExceptionUtils.getStackTrace(t);
+      } else {
+        diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+      }
+      TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+          srcMeta == null ? updateEventMetadata : srcMeta);
+      return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
+    }
+
+    private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
+      if (events != null && !events.isEmpty()) {
+        eventsToSend.addAll(events);
+      }
+    }
+  }
+
+  private static class HeartbeatCallback implements FutureCallback<Boolean> {
+
+    private final ErrorReporter errorReporter;
+
+    HeartbeatCallback(ErrorReporter errorReporter) {
+      this.errorReporter = errorReporter;
+    }
+
+    @Override
+    public void onSuccess(Boolean result) {
+      if (result == false) {
+        errorReporter.shutdownRequested();
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      errorReporter.reportError(t);
+    }
+  }
+
+  public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
+    return currentCallable.taskSucceeded(taskAttemptID);
+  }
+
+  public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+      EventMetaData srcMeta) throws IOException, TezException {
+    return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
+  }
+
+  public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
+    currentCallable.addEvents(taskAttemptID, events);
+  }
+
+  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
+    return umbilical.canCommit(taskAttemptID);
+  }
+
+  private static final class ResponseWrapper {
+    boolean shouldDie;
+    int numEvents;
+
+    private ResponseWrapper(boolean shouldDie, int numEvents) {
+      this.shouldDie = shouldDie;
+      this.numEvents = numEvents;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e250983e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
new file mode 100644
index 0000000..c0843a0
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezLocalResource;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.RelocalizationUtils;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TezChild {
+
+  private static final Logger LOG = Logger.getLogger(TezChild.class);
+
+  private final Configuration defaultConf;
+  private final String containerIdString;
+  private final int appAttemptNumber;
+  private final String[] localDirs;
+
+  private final AtomicLong heartbeatCounter = new AtomicLong(0);
+
+  private final int getTaskMaxSleepTime;
+  private final int amHeartbeatInterval;
+  private final long sendCounterInterval;
+  private final int maxEventsToGet;
+  private final boolean isLocal;
+  private final String workingDir;
+
+  private final ListeningExecutorService executor;
+  private final ObjectRegistryImpl objectRegistry;
+  private final String pid;
+  private final ExecutionContext ExecutionContext;
+  private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+  private final Map<String, String> serviceProviderEnvMap;
+
+  private Multimap<String, String> startedInputsMap = HashMultimap.create();
+
+  private TaskReporter taskReporter;
+  private TezTaskUmbilicalProtocol umbilical;
+  private int taskCount = 0;
+  private TezVertexID lastVertexID;
+
+  public TezChild(Configuration conf, String host, int port, String containerIdentifier,
+      String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
+      Map<String, String> serviceProviderEnvMap,
+      ObjectRegistryImpl objectRegistry, String pid,
+      ExecutionContext ExecutionContext)
+      throws IOException, InterruptedException {
+    this.defaultConf = conf;
+    this.containerIdString = containerIdentifier;
+    this.appAttemptNumber = appAttemptNumber;
+    this.localDirs = localDirs;
+    this.serviceProviderEnvMap = serviceProviderEnvMap;
+    this.workingDir = workingDir;
+    this.pid = pid;
+    this.ExecutionContext = ExecutionContext;
+
+    getTaskMaxSleepTime = defaultConf.getInt(
+        TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
+        TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
+
+    amHeartbeatInterval = defaultConf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+
+    sendCounterInterval = defaultConf.getLong(
+        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
+        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
+
+    maxEventsToGet = defaultConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+        TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
+
+    ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("TezChild").build());
+    this.executor = MoreExecutors.listeningDecorator(executor);
+
+    this.objectRegistry = objectRegistry;
+
+    // Security framework already loaded the tokens into current ugi
+    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Executing with tokens:");
+      for (Token<?> token : credentials.getAllTokens()) {
+        LOG.debug(token);
+      }
+    }
+
+    this.isLocal = defaultConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
+        TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+    UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
+    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+
+    serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+        TezCommonUtils.convertJobTokenToBytes(jobToken));
+
+    if (!isLocal) {
+      final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);
+      SecurityUtil.setTokenService(jobToken, address);
+      taskOwner.addToken(jobToken);
+      umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+        @Override
+        public TezTaskUmbilicalProtocol run() throws Exception {
+          return RPC.getProxy(TezTaskUmbilicalProtocol.class,
+              TezTaskUmbilicalProtocol.versionID, address, defaultConf);
+        }
+      });
+    }
+  }
+  
+  public ContainerExecutionResult run() throws IOException, InterruptedException, TezException {
+
+    ContainerContext containerContext = new ContainerContext(containerIdString);
+    ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext,
+        getTaskMaxSleepTime);
+
+    taskReporter = new TaskReporter(umbilical, amHeartbeatInterval,
+        sendCounterInterval, maxEventsToGet, heartbeatCounter, containerIdString);
+
+    UserGroupInformation childUGI = null;
+
+    while (!executor.isTerminated()) {
+      if (taskCount > 0) {
+        TezUtilsInternal.updateLoggers("");
+      }
+      ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+      ContainerTask containerTask = null;
+      try {
+        containerTask = getTaskFuture.get();
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        handleError(cause);
+        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+            cause, "Execution Exception while fetching new work: " + e.getMessage());
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted while waiting for new work:"
+            + containerTask.getTaskSpec().getTaskAttemptID());
+        handleError(e);
+        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
+            "Interrupted while waiting for new work");
+      }
+      if (containerTask.shouldDie()) {
+        LOG.info("ContainerTask returned shouldDie=true, Exiting");
+        shutdown();
+        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+            "Asked to die by the AM");
+      } else {
+        String loggerAddend = containerTask.getTaskSpec().getTaskAttemptID().toString();
+        taskCount++;
+        TezUtilsInternal.updateLoggers(loggerAddend);
+        FileSystem.clearStatistics();
+
+        childUGI = handleNewTaskCredentials(containerTask, childUGI);
+        handleNewTaskLocalResources(containerTask);
+        cleanupOnTaskChanged(containerTask);
+
+        // Execute the Actual Task
+        TezTaskRunner taskRunner = new TezTaskRunner(defaultConf, childUGI,
+            localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber,
+            serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
+            executor, objectRegistry, pid, this.ExecutionContext);
+        boolean shouldDie;
+        try {
+          shouldDie = !taskRunner.run();
+          if (shouldDie) {
+            LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+            shutdown();
+            return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+                "Asked to die by the AM");
+          }
+        } catch (IOException e) {
+          handleError(e);
+          return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+              e, "TaskExecutionFailure: " + e.getMessage());
+        } catch (TezException e) {
+          handleError(e);
+          return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+              e, "TaskExecutionFailure: " + e.getMessage());
+        } finally {
+          FileSystem.closeAllForUGI(childUGI);
+        }
+      }
+    }
+    return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+        null);
+  }
+
+  /**
+   * Setup
+   * 
+   * @param containerTask
+   *          the new task specification. Must be a valid task
+   * @param childUGI
+   *          the old UGI instance being used
+   * @return childUGI
+   */
+  UserGroupInformation handleNewTaskCredentials(ContainerTask containerTask,
+      UserGroupInformation childUGI) {
+    // Re-use the UGI only if the Credentials have not changed.
+    Preconditions.checkState(!containerTask.shouldDie());
+    Preconditions.checkState(containerTask.getTaskSpec() != null);
+    if (containerTask.haveCredentialsChanged()) {
+      LOG.info("Refreshing UGI since Credentials have changed");
+      Credentials taskCreds = containerTask.getCredentials();
+      if (taskCreds != null) {
+        LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
+            + taskCreds.numberOfSecretKeys());
+        childUGI = UserGroupInformation.createRemoteUser(System
+            .getenv(ApplicationConstants.Environment.USER.toString()));
+        childUGI.addCredentials(containerTask.getCredentials());
+      } else {
+        LOG.info("Not loading any credentials, since no credentials provided");
+      }
+    }
+    return childUGI;
+  }
+
+  /**
+   * Handles any additional resources to be localized for the new task
+   * 
+   * @param containerTask
+   * @throws IOException
+   * @throws TezException
+   */
+  private void handleNewTaskLocalResources(ContainerTask containerTask) throws IOException,
+      TezException {
+    Map<String, TezLocalResource> additionalResources = containerTask.getAdditionalResources();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Additional Resources added to container: " + additionalResources);
+    }
+
+    LOG.info("Localizing additional local resources for Task : " + additionalResources);
+    List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
+        Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
+          @Override
+          public URI apply(TezLocalResource input) {
+            return input.getUri();
+          }
+        }), defaultConf, workingDir);
+    RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
+
+    LOG.info("Done localizing additional resources");
+    final TaskSpec taskSpec = containerTask.getTaskSpec();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("New container task context:" + taskSpec.toString());
+    }
+  }
+
+  /**
+   * Cleans entries from the object registry, and resets the startedInputsMap if required
+   * 
+   * @param containerTask
+   *          the new task specification. Must be a valid task
+   */
+  private void cleanupOnTaskChanged(ContainerTask containerTask) {
+    Preconditions.checkState(!containerTask.shouldDie());
+    Preconditions.checkState(containerTask.getTaskSpec() != null);
+    TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getTaskID()
+        .getVertexID();
+    if (lastVertexID != null) {
+      if (!lastVertexID.equals(newVertexID)) {
+        objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.VERTEX);
+      }
+      if (!lastVertexID.getDAGId().equals(newVertexID.getDAGId())) {
+        objectRegistry.clearCache(ObjectRegistryImpl.ObjectLifeCycle.DAG);
+        startedInputsMap = HashMultimap.create();
+      }
+    }
+    lastVertexID = newVertexID;
+  }
+
+  private void shutdown() {
+    executor.shutdownNow();
+    if (taskReporter != null) {
+      taskReporter.shutdown();
+    }
+    RPC.stopProxy(umbilical);
+    DefaultMetricsSystem.shutdown();
+    if (!isLocal) {
+      LogManager.shutdown();
+    }
+  }
+
+  public void setUmbilical(TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol){
+    if(tezTaskUmbilicalProtocol != null){
+      this.umbilical = tezTaskUmbilicalProtocol;
+    }
+  }
+
+  public static class ContainerExecutionResult {
+    public static enum ExitStatus {
+      SUCCESS(0),
+      EXECUTION_FAILURE(1),
+      INTERRUPTED(2),
+      ASKED_TO_DIE(3);
+
+      private final int exitCode;
+
+      ExitStatus(int code) {
+        this.exitCode = code;
+      }
+
+      public int getExitCode() {
+        return this.exitCode;
+      }
+    }
+
+    private final ExitStatus exitStatus;
+    private final Throwable throwable;
+    private final String errorMessage;
+
+    ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable,
+                             @Nullable String errorMessage) {
+      this.exitStatus = exitStatus;
+      this.throwable = throwable;
+      this.errorMessage = errorMessage;
+    }
+
+    public ExitStatus getExitStatus() {
+      return this.exitStatus;
+    }
+
+    public Throwable getThrowable() {
+      return this.throwable;
+    }
+
+    public String getErrorMessage() {
+      return this.errorMessage;
+    }
+  }
+
+  public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
+      String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
+      Map<String, String> serviceProviderEnvMap, @Nullable String pid,
+      ExecutionContext ExecutionContext)
+      throws IOException, InterruptedException, TezException {
+
+    // Pull in configuration specified for the session.
+    // TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file
+    // for each and every task, and reading it back from disk. Also needs to be per vertex.
+    TezUtilsInternal.addUserSpecifiedTezConfiguration(workingDirectory, conf);
+    UserGroupInformation.setConfiguration(conf);
+    Limits.setConfiguration(conf);
+
+    // Should this be part of main - Metrics and ObjectRegistry. TezTask setup should be independent
+    // of this class. Leaving it here, till there's some entity representing a running JVM.
+    DefaultMetricsSystem.initialize("TezTask");
+
+    // singleton of ObjectRegistry for this JVM
+    ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
+
+    return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
+        attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid,
+        ExecutionContext);
+  }
+
+  public static void main(String[] args) throws IOException, InterruptedException, TezException {
+
+    final Configuration defaultConf = new Configuration();
+
+    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    LOG.info("TezChild starting");
+
+    assert args.length == 5;
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    final String containerIdentifier = args[2];
+    final String tokenIdentifier = args[3];
+    final int attemptNumber = Integer.parseInt(args[4]);
+    final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
+        .name()));
+    final String pid = System.getenv().get("JVM_PID");
+    LOG.info("PID, containerIdentifier:  " + pid + ", " + containerIdentifier);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port
+          + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber
+          + " tokenIdentifier: " + tokenIdentifier);
+    }
+    TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
+        tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
+        System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())));
+    tezChild.run();
+  }
+
+  private void handleError(Throwable t) {
+    shutdown();
+  }
+
+}