You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:53 UTC

[35/50] [abbrv] tez git commit: TEZ-2441. Add tests for TezTaskRunner2. (sseth)

TEZ-2441. Add tests for TezTaskRunner2. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/25dd3097
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/25dd3097
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/25dd3097

Branch: refs/heads/TEZ-2003
Commit: 25dd309778166c2f7db829234bd12aae2e76ecb2
Parents: 17feebc
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jul 29 18:25:18 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:47:09 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../org/apache/tez/runtime/task/TezChild.java   |   5 +-
 .../apache/tez/runtime/task/TezTaskRunner.java  |   2 +-
 .../apache/tez/runtime/task/TezTaskRunner2.java |  42 +-
 .../runtime/task/TaskExecutionTestHelpers.java  | 451 +++++++++++++
 .../runtime/task/TestContainerExecution.java    |  59 ++
 .../tez/runtime/task/TestTaskExecution.java     | 400 +-----------
 .../tez/runtime/task/TestTaskExecution2.java    | 638 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |  19 +
 9 files changed, 1213 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/25dd3097/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index b88044b..9d72d92 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -39,5 +39,6 @@ ALL CHANGES:
   TEZ-2651. Pluggable services should not extend AbstractService.
   TEZ-2652. Cleanup the way services are specified for an AM and vertices.
   TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration.
+  TEZ-2441. Add tests for TezTaskRunner2.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/25dd3097/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 353fe23..b64ec37 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.log4j.LogManager;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.TezCommonUtils;
@@ -68,7 +67,6 @@ import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.slf4j.Logger;
@@ -256,6 +254,7 @@ public class TezChild {
         boolean shouldDie;
         try {
           TaskRunner2Result result = taskRunner.run();
+          LOG.info("TaskRunner2Result: {}", result);
           shouldDie = result.isContainerShutdownRequested();
           if (shouldDie) {
             LOG.info("Got a shouldDie notification via heartbeats for container {}. Shutting down", containerIdString);
@@ -377,8 +376,6 @@ public class TezChild {
       }
       if (ownUmbilical) {
         RPC.stopProxy(umbilical);
-        // TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild if possible.
-//        LogManager.shutdown();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/25dd3097/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index a82d87b..aebf6a9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -250,7 +250,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
                   cause = ((UndeclaredThrowableException) cause).getCause();
                 }
                 maybeRegisterFirstException(cause);
-                LOG.info("Encounted an error while executing task: " + task.getTaskAttemptID(),
+                LOG.info("Encountered an error while executing task: " + task.getTaskAttemptID(),
                     cause);
                 try {
                   sendFailure(cause, "Failure while running task");

http://git-wip-us.apache.org/repos/asf/tez/blob/25dd3097/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index a5fabb5..1a8828d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -48,6 +49,9 @@ import org.slf4j.LoggerFactory;
 
 public class TezTaskRunner2 {
 
+  // Behaviour changes as compared to TezTaskRunner
+  // - Exception not thrown. Instead returned in the result.
+  // - The actual exception is part of the result, instead of requiring a getCause().
 
   private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner2.class);
 
@@ -156,19 +160,7 @@ public class TezTaskRunner2 {
           }
         }
       }
-      if (executionResult != null) {
-        synchronized (this) {
-          if (isRunningState()) {
-            if (executionResult.error != null) {
-              trySettingEndReason(EndReason.TASK_ERROR);
-              registerFirstException(executionResult.error, null);
-            } else {
-              trySettingEndReason(EndReason.SUCCESS);
-              taskComplete.set(true);
-            }
-          }
-        }
-      }
+      processCallableResult(executionResult);
 
       switch (firstEndReason) {
         case SUCCESS:
@@ -249,6 +241,26 @@ public class TezTaskRunner2 {
     }
   }
 
+  // It's possible for the task to actually complete, and an alternate signal such as killTask/killContainer
+  // come in before the future has been processed by this thread. That condition is not handled - and
+  // the result of the execution will be determind by the thread order.
+  @VisibleForTesting
+  void processCallableResult(TaskRunner2CallableResult executionResult) {
+    if (executionResult != null) {
+      synchronized (this) {
+        if (isRunningState()) {
+          if (executionResult.error != null) {
+            trySettingEndReason(EndReason.TASK_ERROR);
+            registerFirstException(executionResult.error, null);
+          } else {
+            trySettingEndReason(EndReason.SUCCESS);
+            taskComplete.set(true);
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Attempt to kill the running task, if it hasn't already completed for some other reason.
    * @return true if the task kill was honored, false otherwise
@@ -438,12 +450,12 @@ public class TezTaskRunner2 {
   private String getTaskDiagnosticsString(Throwable t, String message) {
     String diagnostics;
     if (t != null && message != null) {
-      diagnostics = "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + ", errorMessage="
+      diagnostics = "Failure while running task: " + ExceptionUtils.getStackTrace(t) + ", errorMessage="
           + message;
     } else if (t == null && message == null) {
       diagnostics = "Unknown error";
     } else {
-      diagnostics = t != null ? "exceptionThrown=" + ExceptionUtils.getStackTrace(t)
+      diagnostics = t != null ? "Failure while running task: " + ExceptionUtils.getStackTrace(t)
           : " errorMessage=" + message;
     }
     return diagnostics;

http://git-wip-us.apache.org/repos/asf/tez/blob/25dd3097/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
new file mode 100644
index 0000000..fc42da3
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskExecutionTestHelpers {
+
+  public static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException";
+
+  // Uses static fields for signaling. Ensure only used by one test at a time.
+  public static class TestProcessor extends AbstractLogicalIOProcessor {
+
+    public static final byte[] CONF_EMPTY = new byte[] { 0 };
+    public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 };
+    public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 };
+    public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 };
+    public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 };
+    public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 };
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
+
+    private static final ReentrantLock processorLock = new ReentrantLock();
+    private static final Condition processorCondition = processorLock.newCondition();
+    private static final Condition loopCondition = processorLock.newCondition();
+    private static final Condition completionCondition = processorLock.newCondition();
+    private static final Condition runningCondition = processorLock.newCondition();
+    private static volatile boolean completed = false;
+    private static volatile boolean running = false;
+    private static volatile boolean looping = false;
+    private static volatile boolean signalled = false;
+
+    private static boolean receivedInterrupt = false;
+    private static volatile boolean wasAborted = false;
+
+    private boolean throwIOException = false;
+    private boolean throwTezException = false;
+    private boolean signalFatalAndThrow = false;
+    private boolean signalFatalAndLoop = false;
+    private boolean signalFatalAndComplete = false;
+
+    public TestProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+      parseConf(getContext().getUserPayload().deepCopyAsArray());
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+
+    private void parseConf(byte[] bytes) {
+      byte b = bytes[0];
+      throwIOException = (b & 1) > 0;
+      throwTezException = (b & 2) > 0;
+      signalFatalAndThrow = (b & 4) > 0;
+      signalFatalAndLoop = (b & 8) > 0;
+      signalFatalAndComplete = (b & 16) > 0;
+    }
+
+    public static void reset() {
+      signalled = false;
+      receivedInterrupt = false;
+      completed = false;
+      running = false;
+      wasAborted = false;
+    }
+
+    public static void signal() {
+      LOG.info("Signalled");
+      processorLock.lock();
+      try {
+        signalled = true;
+        processorCondition.signal();
+      } finally {
+        processorLock.unlock();
+      }
+    }
+
+    public static void awaitStart() throws InterruptedException {
+      LOG.info("Awaiting Process run");
+      processorLock.lock();
+      try {
+        if (running) {
+          return;
+        }
+        runningCondition.await();
+      } finally {
+        processorLock.unlock();
+      }
+    }
+
+    public static void awaitLoop() throws InterruptedException {
+      LOG.info("Awaiting loop after signalling error");
+      processorLock.lock();
+      try {
+        if (looping) {
+          return;
+        }
+        loopCondition.await();
+      } finally {
+        processorLock.unlock();
+      }
+    }
+
+    public static void awaitCompletion() throws InterruptedException {
+      LOG.info("Await completion");
+      processorLock.lock();
+      try {
+        if (completed) {
+          return;
+        } else {
+          completionCondition.await();
+        }
+      } finally {
+        processorLock.unlock();
+      }
+    }
+
+    public static boolean wasInterrupted() {
+      processorLock.lock();
+      try {
+        return receivedInterrupt;
+      } finally {
+        processorLock.unlock();
+      }
+    }
+
+    public static boolean wasAborted() {
+      processorLock.lock();
+      try {
+        return wasAborted;
+      } finally {
+        processorLock.unlock();
+      }
+    }
+
+    @Override
+    public void abort() {
+      wasAborted = true;
+    }
+
+    @Override
+    public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws
+        Exception {
+      processorLock.lock();
+      running = true;
+      runningCondition.signal();
+      try {
+        try {
+          LOG.info("Signal is: " + signalled);
+          if (!signalled) {
+            LOG.info("Waiting for processor signal");
+            processorCondition.await();
+          }
+          if (Thread.currentThread().isInterrupted()) {
+            throw new InterruptedException();
+          }
+          LOG.info("Received processor signal");
+          if (throwIOException) {
+            throw createProcessorIOException();
+          } else if (throwTezException) {
+            throw createProcessorTezException();
+          } else if (signalFatalAndThrow) {
+            IOException io = new IOException("FATALERROR");
+            getContext().fatalError(io, "FATALERROR");
+            throw io;
+          } else if (signalFatalAndComplete) {
+            IOException io = new IOException("FATALERROR");
+            getContext().fatalError(io, "FATALERROR");
+            return;
+          } else if (signalFatalAndLoop) {
+            IOException io = createProcessorIOException();
+            getContext().fatalError(io, "FATALERROR");
+            LOG.info("looping");
+            looping = true;
+            loopCondition.signal();
+            LOG.info("Waiting for Processor signal again");
+            processorCondition.await();
+            LOG.info("Received second processor signal");
+          }
+        } catch (InterruptedException e) {
+          receivedInterrupt = true;
+        }
+      } finally {
+        completed = true;
+        completionCondition.signal();
+        processorLock.unlock();
+      }
+    }
+  }
+
+  public static TezException createProcessorTezException() {
+    return new TezException("TezException");
+  }
+
+  public static IOException createProcessorIOException() {
+    return new IOException("IOException");
+  }
+
+  public static class TezTaskUmbilicalForTest implements TezTaskUmbilicalProtocol {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TezTaskUmbilicalForTest.class);
+
+    private final List<TezEvent> requestEvents = new LinkedList<TezEvent>();
+
+    private final ReentrantLock umbilicalLock = new ReentrantLock();
+    private final Condition eventCondition = umbilicalLock.newCondition();
+    private boolean pendingEvent = false;
+    private boolean eventEnacted = false;
+
+    volatile int getTaskInvocations = 0;
+
+    private boolean shouldThrowException = false;
+    private boolean shouldSendDieSignal = false;
+
+    public void signalThrowException() {
+      umbilicalLock.lock();
+      try {
+        shouldThrowException = true;
+        pendingEvent = true;
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void signalSendShouldDie() {
+      umbilicalLock.lock();
+      try {
+        shouldSendDieSignal = true;
+        pendingEvent = true;
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void awaitRegisteredEvent() throws InterruptedException {
+      umbilicalLock.lock();
+      try {
+        if (eventEnacted) {
+          return;
+        }
+        LOG.info("Awaiting event");
+        eventCondition.await();
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void resetTrackedEvents() {
+      umbilicalLock.lock();
+      try {
+        requestEvents.clear();
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void verifyNoCompletionEvents() {
+      umbilicalLock.lock();
+      try {
+        for (TezEvent event : requestEvents) {
+          if (event.getEvent() instanceof TaskAttemptFailedEvent) {
+            fail("Found a TaskAttemptFailedEvent when not expected");
+          }
+          if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
+            fail("Found a TaskAttemptCompletedvent when not expected");
+          }
+        }
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void verifyTaskFailedEvent(String diagnostics) {
+      umbilicalLock.lock();
+      try {
+        for (TezEvent event : requestEvents) {
+          if (event.getEvent() instanceof TaskAttemptFailedEvent) {
+            TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent) event.getEvent();
+            if (failedEvent.getDiagnostics().startsWith(diagnostics)) {
+              return;
+            } else {
+              fail("Diagnostic message does not match expected message. Found [" +
+                  failedEvent.getDiagnostics() + "], Expected: [" + diagnostics + "]");
+            }
+          }
+        }
+        fail("No TaskAttemptFailedEvents sent over umbilical");
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void verifyTaskFailedEvent(String diagStart, String diagContains) {
+      umbilicalLock.lock();
+      try {
+        for (TezEvent event : requestEvents) {
+          if (event.getEvent() instanceof TaskAttemptFailedEvent) {
+            TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent) event.getEvent();
+            if (failedEvent.getDiagnostics().startsWith(diagStart)) {
+              if (diagContains != null) {
+                if (failedEvent.getDiagnostics().contains(diagContains)) {
+                  return;
+                } else {
+                  fail("Diagnostic message does not contain expected message. Found [" +
+                      failedEvent.getDiagnostics() + "], Expected: [" + diagContains + "]");
+                }
+              }
+            } else {
+              fail("Diagnostic message does not start with expected message. Found [" +
+                  failedEvent.getDiagnostics() + "], Expected: [" + diagStart + "]");
+            }
+          }
+        }
+        fail("No TaskAttemptFailedEvents sent over umbilical");
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    public void verifyTaskSuccessEvent() {
+      umbilicalLock.lock();
+      try {
+        for (TezEvent event : requestEvents) {
+          if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
+            return;
+          }
+        }
+        fail("No TaskAttemptFailedEvents sent over umbilical");
+      } finally {
+        umbilicalLock.unlock();
+      }
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+      return 0;
+    }
+
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+                                                  int clientMethodsHash) throws IOException {
+      return null;
+    }
+
+    @Override
+    public ContainerTask getTask(ContainerContext containerContext) throws IOException {
+      // Return shouldDie = true
+      getTaskInvocations++;
+      return new ContainerTask(null, true, null, null, false);
+    }
+
+    @Override
+    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+      return true;
+    }
+
+    @Override
+    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
+        TezException {
+      umbilicalLock.lock();
+      if (request.getEvents() != null) {
+        requestEvents.addAll(request.getEvents());
+      }
+      try {
+        if (shouldThrowException) {
+          LOG.info("TestUmbilical throwing Exception");
+          throw new IOException(HEARTBEAT_EXCEPTION_STRING);
+        }
+        TezHeartbeatResponse response = new TezHeartbeatResponse();
+        response.setLastRequestId(request.getRequestId());
+        if (shouldSendDieSignal) {
+          LOG.info("TestUmbilical returning shouldDie=true");
+          response.setShouldDie();
+        }
+        return response;
+      } finally {
+        if (pendingEvent) {
+          eventEnacted = true;
+          LOG.info("Signalling Event");
+          eventCondition.signal();
+        }
+        umbilicalLock.unlock();
+      }
+    }
+  }
+
+  public static ContainerId createContainerId(ApplicationId appId) {
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+    return containerId;
+  }
+
+  public static TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) {
+    TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
+        createContainerId(appId).toString());
+    return taskReporter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/25dd3097/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
new file mode 100644
index 0000000..c1616af
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.junit.Test;
+
+public class TestContainerExecution {
+
+  @Test(timeout = 5000)
+  public void testGetTaskShouldDie() throws InterruptedException, ExecutionException {
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+      ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      ContainerContext containerContext = new ContainerContext(containerId.toString());
+
+      ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100);
+      ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+
+      getTaskFuture.get();
+      assertEquals(1, umbilical.getTaskInvocations);
+
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/25dd3097/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
index 1bcb337..a99416a 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -18,8 +18,8 @@
 
 package org.apache.tez.runtime.task;
 
+import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createTaskReporter;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -28,30 +28,18 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -60,21 +48,13 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.apache.tez.runtime.common.resources.ScalingAllocator;
+import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TestProcessor;
+import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TezTaskUmbilicalForTest;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,7 +62,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashMultimap;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
@@ -91,7 +70,7 @@ public class TestTaskExecution {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecution.class);
 
-  private static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException";
+
 
   private static final Configuration defaultConf = new Configuration();
   private static final FileSystem localFs;
@@ -137,7 +116,7 @@ public class TestTaskExecution {
       TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
           TestProcessor.CONF_EMPTY);
       // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
       // Signal the processor to go through
       TestProcessor.signal();
       boolean result = taskRunnerFuture.get();
@@ -164,7 +143,7 @@ public class TestTaskExecution {
       TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
           TestProcessor.CONF_EMPTY);
       // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
       // Signal the processor to go through
       TestProcessor.signal();
       boolean result = taskRunnerFuture.get();
@@ -176,7 +155,7 @@ public class TestTaskExecution {
       taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
           TestProcessor.CONF_EMPTY);
       // Setup the executor
-      taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
       // Signal the processor to go through
       TestProcessor.signal();
       result = taskRunnerFuture.get();
@@ -188,7 +167,7 @@ public class TestTaskExecution {
     }
   }
 
-  // test tasked failed due to exception in Processor
+  // test task failed due to exception in Processor
   @Test(timeout = 5000)
   public void testFailedTask() throws IOException, InterruptedException, TezException {
 
@@ -203,7 +182,7 @@ public class TestTaskExecution {
       TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
           TestProcessor.CONF_THROW_TEZ_EXCEPTION);
       // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
       // Signal the processor to go through
       TestProcessor.awaitStart();
       TestProcessor.signal();
@@ -238,7 +217,7 @@ public class TestTaskExecution {
       TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
           "NotExitedProcessor", TestProcessor.CONF_THROW_TEZ_EXCEPTION);
       // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
       try {
         taskRunnerFuture.get();
       } catch (ExecutionException e) {
@@ -268,7 +247,7 @@ public class TestTaskExecution {
       TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
           TestProcessor.CONF_EMPTY);
       // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
       // Signal the processor to go through
       TestProcessor.awaitStart();
       umbilical.signalThrowException();
@@ -280,7 +259,7 @@ public class TestTaskExecution {
       } catch (ExecutionException e) {
         Throwable cause = e.getCause();
         assertTrue(cause instanceof IOException);
-        assertTrue(cause.getMessage().contains(HEARTBEAT_EXCEPTION_STRING));
+        assertTrue(cause.getMessage().contains(TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING));
       }
       TestProcessor.awaitCompletion();
       assertTrue(TestProcessor.wasInterrupted());
@@ -307,7 +286,7 @@ public class TestTaskExecution {
       TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
           TestProcessor.CONF_EMPTY);
       // Setup the executor
-      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
+      Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner));
       // Signal the processor to go through
       TestProcessor.awaitStart();
       umbilical.signalSendShouldDie();
@@ -329,38 +308,14 @@ public class TestTaskExecution {
     }
   }
 
-  @Test(timeout = 5000)
-  public void testGetTaskShouldDie() throws InterruptedException, ExecutionException {
-    ListeningExecutorService executor = null;
-    try {
-      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
-      executor = MoreExecutors.listeningDecorator(rawExecutor);
-      ApplicationId appId = ApplicationId.newInstance(10000, 1);
-      ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-      ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
-
-      TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
-      ContainerContext containerContext = new ContainerContext(containerId.toString());
-
-      ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100);
-      ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
-
-      getTaskFuture.get();
-      assertEquals(1, umbilical.getTaskInvocations);
-
-    } finally {
-      executor.shutdownNow();
-    }
-  }
-
   // Potential new tests
   // Different states - initialization failure, close failure
   // getTask states
 
-  private static class TaskRunnerCallable implements Callable<Boolean> {
+  private static class TaskRunnerCallable1ForTest implements Callable<Boolean> {
     private final TezTaskRunner taskRunner;
 
-    public TaskRunnerCallable(TezTaskRunner taskRunner) {
+    public TaskRunnerCallable1ForTest(TezTaskRunner taskRunner) {
       this.taskRunner = taskRunner;
     }
 
@@ -370,328 +325,9 @@ public class TestTaskExecution {
     }
   }
 
-  // Uses static fields for signaling. Ensure only used by one test at a time.
-  public static class TestProcessor extends AbstractLogicalIOProcessor {
-
-    public static final byte[] CONF_EMPTY = new byte[] { 0 };
-    public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 };
-    public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 };
-    public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 };
-    public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 };
-    public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 };
-
-    private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
-
-    private static final ReentrantLock processorLock = new ReentrantLock();
-    private static final Condition processorCondition = processorLock.newCondition();
-    private static final Condition completionCondition = processorLock.newCondition();
-    private static final Condition runningCondition = processorLock.newCondition();
-    private static boolean completed = false;
-    private static boolean running = false;
-    private static boolean signalled = false;
-
-    public static boolean receivedInterrupt = false;
-
-    private boolean throwIOException = false;
-    private boolean throwTezException = false;
-    private boolean signalFatalAndThrow = false;
-    private boolean signalFatalAndLoop = false;
-    private boolean signalFatalAndComplete = false;
-
-    public TestProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void initialize() throws Exception {
-      parseConf(getContext().getUserPayload().deepCopyAsArray());
-    }
-
-    @Override
-    public void handleEvents(List<Event> processorEvents) {
-
-    }
-
-    @Override
-    public void close() throws Exception {
-
-    }
 
-    private void parseConf(byte[] bytes) {
-      byte b = bytes[0];
-      throwIOException = (b & 1) > 1;
-      throwTezException = (b & 2) > 1;
-      signalFatalAndThrow = (b & 4) > 1;
-      signalFatalAndLoop = (b & 8) > 1;
-      signalFatalAndComplete = (b & 16) > 1;
-    }
 
-    public static void reset() {
-      signalled = false;
-      receivedInterrupt = false;
-      completed = false;
-      running = false;
-    }
 
-    public static void signal() {
-      LOG.info("Signalled");
-      processorLock.lock();
-      try {
-        signalled = true;
-        processorCondition.signal();
-      } finally {
-        processorLock.unlock();
-      }
-    }
-
-    public static void awaitStart() throws InterruptedException {
-      LOG.info("Awaiting Process run");
-      processorLock.lock();
-      try {
-        if (running) {
-          return;
-        }
-        runningCondition.await();
-      } finally {
-        processorLock.unlock();
-      }
-    }
-
-    public static void awaitCompletion() throws InterruptedException {
-      LOG.info("Await completion");
-      processorLock.lock();
-      try {
-        if (completed) {
-          return;
-        } else {
-          completionCondition.await();
-        }
-      } finally {
-        processorLock.unlock();
-      }
-    }
-
-    public static boolean wasInterrupted() {
-      processorLock.lock();
-      try {
-        return receivedInterrupt;
-      } finally {
-        processorLock.unlock();
-      }
-    }
-
-    @Override
-    public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws
-        Exception {
-      processorLock.lock();
-      running = true;
-      runningCondition.signal();
-      try {
-        try {
-          LOG.info("Signal is: " + signalled);
-          if (!signalled) {
-            LOG.info("Waiting for processor signal");
-            processorCondition.await();
-          }
-          if (Thread.currentThread().isInterrupted()) {
-            throw new InterruptedException();
-          }
-          LOG.info("Received processor signal");
-          if (throwIOException) {
-            throw new IOException();
-          } else if (throwTezException) {
-            throw new TezException("TezException");
-          } else if (signalFatalAndThrow) {
-            IOException io = new IOException("FATALERROR");
-            getContext().fatalError(io, "FATALERROR");
-            throw io;
-          } else if (signalFatalAndComplete) {
-            IOException io = new IOException("FATALERROR");
-            getContext().fatalError(io, "FATALERROR");
-            return;
-          } else if (signalFatalAndLoop) {
-            IOException io = new IOException("FATALERROR");
-            getContext().fatalError(io, "FATALERROR");
-            LOG.info("Waiting for Processor signal again");
-            processorCondition.await();
-            LOG.info("Received second processor signal");
-          }
-        } catch (InterruptedException e) {
-          receivedInterrupt = true;
-        }
-      } finally {
-        completed = true;
-        completionCondition.signal();
-        processorLock.unlock();
-      }
-    }
-  }
-
-  private static class TezTaskUmbilicalForTest implements TezTaskUmbilicalProtocol {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TezTaskUmbilicalForTest.class);
-
-    private final List<TezEvent> requestEvents = new LinkedList<TezEvent>();
-
-    private final ReentrantLock umbilicalLock = new ReentrantLock();
-    private final Condition eventCondition = umbilicalLock.newCondition();
-    private boolean pendingEvent = false;
-    private boolean eventEnacted = false;
-
-    volatile int getTaskInvocations = 0;
-
-    private boolean shouldThrowException = false;
-    private boolean shouldSendDieSignal = false;
-
-    public void signalThrowException() {
-      umbilicalLock.lock();
-      try {
-        shouldThrowException = true;
-        pendingEvent = true;
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void signalSendShouldDie() {
-      umbilicalLock.lock();
-      try {
-        shouldSendDieSignal = true;
-        pendingEvent = true;
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void awaitRegisteredEvent() throws InterruptedException {
-      umbilicalLock.lock();
-      try {
-        if (eventEnacted) {
-          return;
-        }
-        LOG.info("Awaiting event");
-        eventCondition.await();
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void resetTrackedEvents() {
-      umbilicalLock.lock();
-      try {
-        requestEvents.clear();
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void verifyNoCompletionEvents() {
-      umbilicalLock.lock();
-      try {
-        for (TezEvent event : requestEvents) {
-          if (event.getEvent() instanceof TaskAttemptFailedEvent) {
-            fail("Found a TaskAttemptFailedEvent when not expected");
-          }
-          if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
-            fail("Found a TaskAttemptCompletedvent when not expected");
-          }
-        }
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void verifyTaskFailedEvent(String diagnostics) {
-      umbilicalLock.lock();
-      try {
-        for (TezEvent event : requestEvents) {
-          if (event.getEvent() instanceof TaskAttemptFailedEvent) {
-            TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent)event.getEvent();
-            if(failedEvent.getDiagnostics().startsWith(diagnostics)){
-              return ;
-            } else {
-              fail("No detailed diagnostics message in TaskAttemptFailedEvent");
-            }
-          }
-        }
-        fail("No TaskAttemptFailedEvents sent over umbilical");
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    public void verifyTaskSuccessEvent() {
-      umbilicalLock.lock();
-      try {
-        for (TezEvent event : requestEvents) {
-          if (event.getEvent() instanceof TaskAttemptCompletedEvent) {
-            return;
-          }
-        }
-        fail("No TaskAttemptFailedEvents sent over umbilical");
-      } finally {
-        umbilicalLock.unlock();
-      }
-    }
-
-    @Override
-    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
-      return 0;
-    }
-
-    @Override
-    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
-        int clientMethodsHash) throws IOException {
-      return null;
-    }
-
-    @Override
-    public ContainerTask getTask(ContainerContext containerContext) throws IOException {
-      // Return shouldDie = true
-      getTaskInvocations++;
-      return new ContainerTask(null, true, null, null, false);
-    }
-
-    @Override
-    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
-      return true;
-    }
-
-    @Override
-    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
-        TezException {
-      umbilicalLock.lock();
-      if (request.getEvents() != null) {
-        requestEvents.addAll(request.getEvents());
-      }
-      try {
-        if (shouldThrowException) {
-          LOG.info("TestUmbilical throwing Exception");
-          throw new IOException(HEARTBEAT_EXCEPTION_STRING);
-        }
-        TezHeartbeatResponse response = new TezHeartbeatResponse();
-        response.setLastRequestId(request.getRequestId());
-        if (shouldSendDieSignal) {
-          LOG.info("TestUmbilical returning shouldDie=true");
-          response.setShouldDie();
-        }
-        return response;
-      } finally {
-        if (pendingEvent) {
-          eventEnacted = true;
-          LOG.info("Signalling Event");
-          eventCondition.signal();
-        }
-        umbilicalLock.unlock();
-      }
-    }
-  }
-
-  private TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) {
-    TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
-        createContainerId(appId).toString());
-    return taskReporter;
-  }
 
   private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical,
       TaskReporter taskReporter, ListeningExecutorService executor, byte[] processorConf)
@@ -722,9 +358,5 @@ public class TestTaskExecution {
     return taskRunner;
   }
 
-  private ContainerId createContainerId(ApplicationId appId) {
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-    ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
-    return containerId;
-  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/25dd3097/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
new file mode 100644
index 0000000..12d9d3f
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createProcessorIOException;
+import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createProcessorTezException;
+import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createTaskReporter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.ObjectRegistry;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.common.resources.ScalingAllocator;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
+import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TestProcessor;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskExecution2 {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecution2.class);
+
+  private static final Configuration defaultConf = new Configuration();
+  private static final FileSystem localFs;
+  private static final Path workDir;
+
+  private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(1);
+
+  static {
+    defaultConf.set("fs.defaultFS", "file:///");
+    defaultConf.set(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
+        ScalingAllocator.class.getName());
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+      Path wd = new Path(System.getProperty("test.build.data", "/tmp"),
+          TestTaskExecution.class.getSimpleName());
+      workDir = localFs.makeQualified(wd);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Before
+  public void reset() {
+    TestProcessor.reset();
+  }
+
+  @AfterClass
+  public static void shutdown() {
+    taskExecutor.shutdownNow();
+  }
+
+  @Test(timeout = 5000)
+  public void testSingleSuccessfulTask() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(
+          new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.signal();
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskSuccessEvent();
+      assertFalse(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testMultipleSuccessfulTasks() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(
+          new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.signal();
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskSuccessEvent();
+      assertFalse(TestProcessor.wasAborted());
+      umbilical.resetTrackedEvents();
+
+      taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.signal();
+      result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskSuccessEvent();
+      assertFalse(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  // test task failed due to exception in Processor
+  @Test(timeout = 5000)
+  public void testFailedTaskTezException() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_THROW_TEZ_EXCEPTION);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      TestProcessor.signal();
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorTezException(), false);
+
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskFailedEvent(
+          "Failure while running task",
+          TezException.class.getName() + ": " + TezException.class.getSimpleName());
+      // Failure detected as a result of fall off from the run method. abort isn't required.
+      assertFalse(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+
+  // Test task failed due to Processor class not found
+  @Test(timeout = 5000)
+  public void testFailedTask2() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          "NotExitedProcessor", TestProcessor.CONF_EMPTY, false);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.TASK_ERROR,
+          new TezUncheckedException("Unchecked exception"), false);
+
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskFailedEvent("Failure while running task",
+          ":org.apache.tez.dag.api.TezUncheckedException: "
+              + "Unable to load class: NotExitedProcessor");
+      // Failure detected as a result of fall off from the run method. abort isn't required.
+      assertFalse(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  // test task failed due to exception in Processor
+  @Test(timeout = 5000)
+  public void testFailedTaskIOException() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_THROW_IO_EXCEPTION);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      TestProcessor.signal();
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false);
+
+
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskFailedEvent(
+          "Failure while running task",
+          IOException.class.getName() + ": " + IOException.class.getSimpleName());
+      // Failure detected as a result of fall off from the run method. abort isn't required.
+      assertFalse(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testHeartbeatException() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      umbilical.signalThrowException();
+      umbilical.awaitRegisteredEvent();
+      // Not signaling an actual start to verify task interruption
+
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.COMMUNICATION_FAILURE,
+          new IOException("IOException"),
+          TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING, false);
+
+      TestProcessor.awaitCompletion();
+      assertTrue(TestProcessor.wasInterrupted());
+      assertNull(taskReporter.currentCallable);
+      // No completion events since umbilical communication already failed.
+      umbilical.verifyNoCompletionEvents();
+      assertTrue(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testHeartbeatShouldDie() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      umbilical.signalSendShouldDie();
+      umbilical.awaitRegisteredEvent();
+      // Not signaling an actual start to verify task interruption
+
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.CONTAINER_STOP_REQUESTED, null, true);
+
+
+      TestProcessor.awaitCompletion();
+      assertTrue(TestProcessor.wasInterrupted());
+      assertNull(taskReporter.currentCallable);
+      // TODO Is this statement correct ?
+      // No completion events since shouldDie was requested by the AM, which should have killed the
+      // task.
+      umbilical.verifyNoCompletionEvents();
+      assertTrue(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testSignalFatalErrorAndLoop() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_SIGNAL_FATAL_AND_LOOP);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      TestProcessor.signal();
+
+      TestProcessor.awaitLoop();
+      // The fatal error should have caused an interrupt.
+
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false);
+
+      TestProcessor.awaitCompletion();
+      assertTrue(TestProcessor.wasInterrupted());
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskFailedEvent(
+          "Failure while running task",
+          IOException.class.getName() + ": " + IOException.class.getSimpleName());
+      // Signal fatal error should cause the processor to fail.
+      assertTrue(TestProcessor.wasAborted());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testTaskKilled() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
+          TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+
+      taskRunner.killTask();
+
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.KILL_REQUESTED, null, false);
+
+      TestProcessor.awaitCompletion();
+      assertTrue(TestProcessor.wasInterrupted());
+      assertNull(taskReporter.currentCallable);
+      // Kill events are not sent over the umbilical at the moment.
+      umbilical.verifyNoCompletionEvents();
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testKilledAfterComplete() throws IOException, InterruptedException, TezException,
+      ExecutionException {
+
+    ListeningExecutorService executor = null;
+    try {
+      ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
+      executor = MoreExecutors.listeningDecorator(rawExecutor);
+      ApplicationId appId = ApplicationId.newInstance(10000, 1);
+      TaskExecutionTestHelpers.TezTaskUmbilicalForTest
+          umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
+      TaskReporter taskReporter = createTaskReporter(appId, umbilical);
+
+      TezTaskRunner2ForTest taskRunner =
+          createTaskRunnerForTest(appId, umbilical, taskReporter, executor,
+              TestProcessor.CONF_EMPTY);
+      // Setup the executor
+      Future<TaskRunner2Result> taskRunnerFuture =
+          taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
+      // Signal the processor to go through
+      TestProcessor.awaitStart();
+      TestProcessor.signal();
+      TestProcessor.awaitCompletion();
+
+      taskRunner.awaitCallableCompletion();
+
+      taskRunner.killTask();
+      TaskRunner2Result result = taskRunnerFuture.get();
+      verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false);
+
+      assertFalse(TestProcessor.wasInterrupted());
+      assertNull(taskReporter.currentCallable);
+      umbilical.verifyTaskSuccessEvent();
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+
+  private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result,
+                                      EndReason expectedEndReason, Throwable expectedThrowable,
+                                      boolean wasShutdownRequested) {
+    verifyTaskRunnerResult(taskRunner2Result, expectedEndReason, expectedThrowable, null,
+        wasShutdownRequested);
+  }
+
+  private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result,
+                                      EndReason expectedEndReason, Throwable expectedThrowable,
+                                      String expectedExceptionMessage,
+                                      boolean wasShutdownRequested) {
+    assertEquals(expectedEndReason, taskRunner2Result.getEndReason());
+    if (expectedThrowable == null) {
+      assertNull(taskRunner2Result.getError());
+    } else {
+      assertNotNull(taskRunner2Result.getError());
+      Throwable cause = taskRunner2Result.getError();
+      LOG.info(cause.getClass().getName());
+      assertTrue(cause.getClass().isAssignableFrom(expectedThrowable.getClass()));
+
+      if (expectedExceptionMessage != null) {
+        assertTrue(cause.getMessage().contains(expectedExceptionMessage));
+      }
+
+    }
+    assertEquals(wasShutdownRequested, taskRunner2Result.isContainerShutdownRequested());
+  }
+
+
+  private static class TaskRunnerCallable2ForTest implements Callable<TaskRunner2Result> {
+    private final TezTaskRunner2 taskRunner;
+
+    public TaskRunnerCallable2ForTest(TezTaskRunner2 taskRunner) {
+      this.taskRunner = taskRunner;
+    }
+
+    @Override
+    public TaskRunner2Result call() throws Exception {
+      return taskRunner.run();
+    }
+  }
+
+  private TezTaskRunner2 createTaskRunner(ApplicationId appId,
+                                          TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
+                                          TaskReporter taskReporter,
+                                          ListeningExecutorService executor, byte[] processorConf)
+      throws IOException {
+    return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(),
+        processorConf, false);
+  }
+
+  private TezTaskRunner2ForTest createTaskRunnerForTest(ApplicationId appId,
+                                                        TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
+                                                        TaskReporter taskReporter,
+                                                        ListeningExecutorService executor,
+                                                        byte[] processorConf)
+      throws IOException {
+    return (TezTaskRunner2ForTest) createTaskRunner(appId, umbilical, taskReporter, executor,
+        TestProcessor.class.getName(),
+        processorConf, true);
+  }
+
+  private TezTaskRunner2 createTaskRunner(ApplicationId appId,
+                                          TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
+                                          TaskReporter taskReporter,
+                                          ListeningExecutorService executor, String processorClass,
+                                          byte[] processorConf, boolean testRunner) throws
+      IOException {
+    TezConfiguration tezConf = new TezConfiguration(defaultConf);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    Path testDir = new Path(workDir, UUID.randomUUID().toString());
+    String[] localDirs = new String[]{testDir.toString()};
+
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
+    TezTaskID taskId = TezTaskID.getInstance(vertexId, 1);
+    TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
+    ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create(processorClass)
+        .setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf)));
+    TaskSpec taskSpec =
+        new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
+            new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
+
+    TezTaskRunner2 taskRunner;
+    if (testRunner) {
+      taskRunner = new TezTaskRunner2ForTest(tezConf, ugi, localDirs, taskSpec, 1,
+          new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
+          HashMultimap.<String, String>create(), taskReporter,
+          executor, null, "", new ExecutionContextImpl("localhost"),
+          Runtime.getRuntime().maxMemory());
+    } else {
+      taskRunner = new TezTaskRunner2(tezConf, ugi, localDirs, taskSpec, 1,
+          new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
+          HashMultimap.<String, String>create(), taskReporter,
+          executor, null, "", new ExecutionContextImpl("localhost"),
+          Runtime.getRuntime().maxMemory());
+    }
+
+    return taskRunner;
+  }
+
+  public static class TezTaskRunner2ForTest extends TezTaskRunner2 {
+
+    private final ReentrantLock testLock = new ReentrantLock();
+    private final Condition callableCompletionCondition = testLock.newCondition();
+
+    private final AtomicBoolean isCallableComplete = new AtomicBoolean(false);
+
+    public TezTaskRunner2ForTest(Configuration tezConf, UserGroupInformation ugi,
+                                 String[] localDirs,
+                                 TaskSpec taskSpec, int appAttemptNumber,
+                                 Map<String, ByteBuffer> serviceConsumerMetadata,
+                                 Map<String, String> serviceProviderEnvMap,
+                                 Multimap<String, String> startedInputsMap,
+                                 TaskReporterInterface taskReporter,
+                                 ListeningExecutorService executor,
+                                 ObjectRegistry objectRegistry,
+                                 String pid,
+                                 ExecutionContext executionContext,
+                                 long memAvailable) throws IOException {
+      super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata,
+          serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid,
+          executionContext, memAvailable);
+    }
+
+
+    @Override
+    @VisibleForTesting
+    void processCallableResult(TaskRunner2Callable.TaskRunner2CallableResult executionResult) {
+      testLock.lock();
+      try {
+        super.processCallableResult(executionResult);
+        isCallableComplete.set(true);
+        callableCompletionCondition.signal();
+      } finally {
+        testLock.unlock();
+      }
+    }
+
+    void awaitCallableCompletion() throws InterruptedException {
+      testLock.lock();
+      try {
+        while (!isCallableComplete.get()) {
+          callableCompletionCondition.await();
+        }
+      } finally {
+        testLock.unlock();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/25dd3097/tez-runtime-internals/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/resources/log4j.properties b/tez-runtime-internals/src/test/resources/log4j.properties
new file mode 100644
index 0000000..531b68b
--- /dev/null
+++ b/tez-runtime-internals/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n