You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/05/29 19:51:11 UTC

[2/2] git commit: TEZ-1116. Refactor YarnTezDAGChild to be testable and usable for LocalMode. (sseth)

TEZ-1116. Refactor YarnTezDAGChild to be testable and usable for
LocalMode. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/acd0a46e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/acd0a46e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/acd0a46e

Branch: refs/heads/master
Commit: acd0a46e36d01ec5d98b9ca263741c4238f0f9aa
Parents: 80b91a4
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 29 10:50:35 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu May 29 10:50:35 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |   2 +-
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 735 -------------------
 .../tez/dag/utils/TezRuntimeChildJVM.java       |   4 +-
 .../tez/runtime/task/ContainerReporter.java     |  84 +++
 .../apache/tez/runtime/task/ErrorReporter.java  |   8 +
 .../apache/tez/runtime/task/TaskReporter.java   | 383 ++++++++++
 .../org/apache/tez/runtime/task/TezChild.java   | 366 +++++++++
 .../apache/tez/runtime/task/TezTaskRunner.java  | 378 ++++++++++
 .../tez/runtime/task/TestTaskExecution.java     | 661 +++++++++++++++++
 .../org/apache/tez/mapreduce/TestUmbilical.java |   6 +-
 .../org/apache/tez/common/ContainerContext.java |  43 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |   3 +-
 .../org/apache/tez/runtime/RuntimeTask.java     |   4 +
 .../runtime/api/impl/TezTaskContextImpl.java    |  18 +-
 .../tez/runtime/api/impl/TezUmbilical.java      |   3 +-
 15 files changed, 1913 insertions(+), 785 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 36fdd59..0b7f52a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -180,7 +180,7 @@ public class TezConfiguration extends Configuration {
    */
   public static final String TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS = TEZ_TASK_PREFIX
       + "am.heartbeat.counter.interval-ms.max";
-  public static final long TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT =
+  public static final int TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT =
       1000;
 
   public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
deleted file mode 100644
index 7c02077..0000000
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ /dev/null
@@ -1,735 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.mapred;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezLocalResource;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.Limits;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.utils.RelocalizationUtils;
-import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.apache.tez.runtime.api.impl.TezUmbilical;
-import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-
-import com.google.common.base.Function;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-
-/**
- * The main() for TEZ Task processes.
- */
-public class YarnTezDagChild {
-
-  private static final Logger LOG = Logger.getLogger(YarnTezDagChild.class);
-
-  private static AtomicBoolean stopped = new AtomicBoolean(false);
-
-  private static String containerIdStr;
-  private static int maxEventsToGet = 0;
-  private static LinkedBlockingQueue<TezEvent> eventsToSend =
-      new LinkedBlockingQueue<TezEvent>();
-  private static AtomicLong requestCounter = new AtomicLong(0);
-  private static long amPollInterval;
-  private static long hbCounterInterval;
-  private static TezTaskUmbilicalProtocol umbilical;
-  private static ReentrantReadWriteLock taskLock = new ReentrantReadWriteLock();
-  private static LogicalIOProcessorRuntimeTask currentTask = null;
-  private static TezTaskAttemptID currentTaskAttemptID;
-  private static AtomicBoolean heartbeatError = new AtomicBoolean(false);
-  private static Throwable heartbeatErrorException = null;
-  // Implies that the task is done - and the AM is being informed.
-  private static AtomicBoolean currentTaskComplete = new AtomicBoolean(true);
-  /**
-   * Used to maintain information about which Inputs have been started by the
-   * framework for the specific DAG. Makes an assumption that multiple DAGs do
-   * not execute concurrently, and must be reset each time the running DAG
-   * changes.
-   */
-  private static Multimap<String, String> startedInputsMap = HashMultimap.create();
-  
-  private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds.
-  private static final float LOG_COUNTER_BACKOFF = 1.3f;
-  private static int taskNonOobHeartbeatCounter = 0;
-  private static int nextHeartbeatNumToLog = 0;
-  private static long prevHeartbeatTimeStamp =  System.currentTimeMillis();
-
-  private static Thread startHeartbeatThread() {
-    Thread heartbeatThread = new Thread(new Runnable() {
-      public void run() {
-        while (!(stopped.get() || heartbeatError.get())) {
-          try {
-            try {
-              if(!heartbeat()) {
-                // AM asked us to die
-                break;
-              }
-            } catch (InvalidToken e) {
-              // FIXME NEWTEZ maybe send a container failed event to AM?
-              // Irrecoverable error unless heartbeat sync can be re-established
-              LOG.error("Heartbeat error in authenticating with AM: ", e);
-              heartbeatErrorException = e;
-              heartbeatError.set(true);
-              break;
-            } catch (Throwable e) {
-              // FIXME NEWTEZ maybe send a container failed event to AM?
-              // Irrecoverable error unless heartbeat sync can be re-established
-              LOG.error("Heartbeat error in communicating with AM. ", e);
-              if (e instanceof Error) {
-                LOG.error("Exception of type Error. Exiting now", e);
-                ExitUtil.terminate(-1, e);
-              }
-              heartbeatErrorException = e;
-              heartbeatError.set(true);
-              break;
-            }
-            Thread.sleep(amPollInterval);
-          } catch (InterruptedException e) {
-            // we were interrupted so that we will stop.
-            LOG.info("Heartbeat thread interrupted. " +
-            " stopped: " + stopped.get() +  " error: " + heartbeatError.get());
-            continue; 
-          }
-        }
-        
-        if (currentTaskComplete.get() || stopped.get()) {
-          // Don't exit. The Tez framework has control, let the container finish after cleanup etc.
-          // Makes an assumption that a heartbeat shouldDie will be reported as a getTask should die.
-          LOG.info("Current task marked as complete. Stopping heartbeat thread and allowing normal container shutdown");
-          return;
-        } else {
-          // Assuming the task is still running, and we've been asked to die or an error occurred.
-          // Stop the process.
-          if (heartbeatErrorException != null) {
-            ExitUtil.terminate(-1, heartbeatErrorException);
-          } else {
-            ExitUtil.terminate(-1, "Exiting Tez Child Process");
-          }
-        }
-      }
-    });
-    heartbeatThread.setName("Tez Container Heartbeat Thread ["
-        + containerIdStr + "]");
-    heartbeatThread.setDaemon(true);
-    heartbeatThread.start();
-    return heartbeatThread;
-  }
-
-  private static synchronized boolean heartbeat() throws TezException, IOException {
-    return heartbeat(null);
-  }
-
-  private static synchronized boolean heartbeat(
-      Collection<TezEvent> outOfBandEvents)
-      throws TezException, IOException {
-    TezEvent updateEvent = null;
-    int eventCounter = 0;
-    int eventsRange = 0;
-    TezTaskAttemptID taskAttemptID = null;
-    List<TezEvent> events = new ArrayList<TezEvent>();
-    try {
-      taskLock.readLock().lock();
-      if (currentTask != null) {
-        eventsToSend.drainTo(events);
-        taskAttemptID = currentTaskAttemptID;
-        eventCounter = currentTask.getEventCounter();
-        eventsRange = maxEventsToGet;
-        if (!currentTask.isTaskDone() && !currentTask.hadFatalError()) {
-          TezCounters counters = null;
-          /**
-           * Increasing the heartbeat interval can delay the delivery of events.
-           * Sending just updated records would save CPU in DAG AM, but certain
-           * counters are updated very frequently. Until real time decisions are made
-           * based on these counters, it can be sent once per second.
-           */
-          if ((System.currentTimeMillis() - prevHeartbeatTimeStamp) > hbCounterInterval) {
-            counters = currentTask.getCounters();
-            prevHeartbeatTimeStamp = System.currentTimeMillis();
-          }
-          updateEvent = new TezEvent(new TaskStatusUpdateEvent(
-              counters, currentTask.getProgress()),
-                new EventMetaData(EventProducerConsumerType.SYSTEM,
-                    currentTask.getVertexName(), "", taskAttemptID));
-          events.add(updateEvent);
-        } else if (outOfBandEvents == null && events.isEmpty()) {
-          LOG.info("Setting TaskAttemptID to null as the task has already"
-            + " completed. Caused by race-condition between the normal"
-            + " heartbeat and out-of-band heartbeats");
-          taskAttemptID = null;
-        } else {
-          if (outOfBandEvents != null && !outOfBandEvents.isEmpty()) {
-            events.addAll(outOfBandEvents);
-          }
-        }
-      }
-    } finally {
-      taskLock.readLock().unlock();
-    }
-
-    if (LOG.isDebugEnabled()) {
-      taskNonOobHeartbeatCounter++;
-      if (taskNonOobHeartbeatCounter == nextHeartbeatNumToLog) {
-        taskLock.readLock().lock();
-        try {
-          if (currentTask != null) {
-            LOG.debug("Counters: " + currentTask.getCounters().toShortString());
-            taskNonOobHeartbeatCounter = 0;
-            nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
-          }
-        } finally {
-          taskLock.readLock().unlock();
-        }
-      }
-    }
-
-    long reqId = requestCounter.incrementAndGet();
-    TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
-        containerIdStr, taskAttemptID, eventCounter, eventsRange);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Sending heartbeat to AM"
-          + ", request=" + request.toString());
-    }
-    TezHeartbeatResponse response = umbilical.heartbeat(request);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Received heartbeat response from AM"
-          + ", response=" + response);
-    }
-    if(response.shouldDie()) {
-      LOG.info("Received should die response from AM");
-      return false;
-    }
-    if (response.getLastRequestId() != reqId) {
-      throw new TezException("AM and Task out of sync"
-          + ", responseReqId=" + response.getLastRequestId()
-          + ", expectedReqId=" + reqId);
-    }
-    try {
-      taskLock.readLock().lock();
-      if (taskAttemptID == null
-          || !taskAttemptID.equals(currentTaskAttemptID)) {
-        if (response.getEvents() != null
-            && !response.getEvents().isEmpty()) {
-          LOG.warn("No current assigned task, ignoring all events in"
-              + " heartbeat response, eventCount="
-              + response.getEvents().size());
-        }
-        return true;
-      }
-      if (currentTask != null && response.getEvents() != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Routing events from heartbeat response to task"
-              + ", currentTaskAttemptId=" + currentTaskAttemptID
-              + ", eventCount=" + response.getEvents().size());
-        }
-        currentTask.handleEvents(response.getEvents());
-      }
-    } finally {
-      taskLock.readLock().unlock();
-    }
-    return true;
-  }
-  
-  public static void main(String[] args) throws Throwable {
-    Thread.setDefaultUncaughtExceptionHandler(
-        new YarnUncaughtExceptionHandler());
-    LOG.info("YarnTezDagChild starting");
-
-    final Configuration defaultConf = new Configuration();
-    TezUtils.addUserSpecifiedTezConfiguration(defaultConf);
-    // Security settings will be loaded based on core-site and core-default.
-    // Don't depend on the jobConf for this.
-    UserGroupInformation.setConfiguration(defaultConf);
-    Limits.setConfiguration(defaultConf);
-
-    assert args.length == 5;
-    String host = args[0];
-    int port = Integer.parseInt(args[1]);
-    final InetSocketAddress address =
-        NetUtils.createSocketAddrForHost(host, port);
-    final String containerIdentifier = args[2];
-    final String tokenIdentifier = args[3];
-    final int attemptNumber = Integer.parseInt(args[4]);
-    if (LOG.isDebugEnabled()) {
-      LOG.info("Info from cmd line: AM-host: " + host + " AM-port: " + port
-          + " containerIdentifier: " + containerIdentifier + " attemptNumber: "
-          + attemptNumber + " tokenIdentifier: " + tokenIdentifier);
-    }
-    // FIXME fix initialize metrics in child runner
-    DefaultMetricsSystem.initialize("VertexTask");
-    YarnTezDagChild.containerIdStr = containerIdentifier;
-
-    ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
-    @SuppressWarnings("unused")
-    Injector injector = Guice.createInjector(
-        new ObjectRegistryModule(objectRegistry));
-
-    // Security framework already loaded the tokens into current ugi
-    Credentials credentials =
-        UserGroupInformation.getCurrentUser().getCredentials();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Executing with tokens:");
-      for (Token<?> token : credentials.getAllTokens()) {
-        LOG.debug(token);
-      }
-    }
-
-    amPollInterval = defaultConf.getLong(
-        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
-        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
-    hbCounterInterval = defaultConf.getLong(
-      TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
-      TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
-    maxEventsToGet = defaultConf.getInt(
-        TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
-        TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
-
-    // Create TaskUmbilicalProtocol as actual task owner.
-    UserGroupInformation taskOwner =
-      UserGroupInformation.createRemoteUser(tokenIdentifier);
-
-    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
-    SecurityUtil.setTokenService(jobToken, address);
-    taskOwner.addToken(jobToken);
-    // Will jobToken change across DAGs ?
-    Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
-    serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
-        ShuffleUtils.convertJobTokenToBytes(jobToken));
-
-    umbilical =
-      taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
-      @Override
-      public TezTaskUmbilicalProtocol run() throws Exception {
-        return (TezTaskUmbilicalProtocol)RPC.getProxy(TezTaskUmbilicalProtocol.class,
-            TezTaskUmbilicalProtocol.versionID, address, defaultConf);
-      }
-    });
-
-    final Thread heartbeatThread = startHeartbeatThread();
-
-    TezUmbilical tezUmbilical = new TezUmbilical() {
-      @Override
-      public void addEvents(Collection<TezEvent> events) {
-        eventsToSend.addAll(events);
-      }
-
-      @Override
-      public void signalFatalError(TezTaskAttemptID taskAttemptID,
-          String diagnostics,
-          EventMetaData sourceInfo) {
-        currentTask.setFrameworkCounters();
-        TezEvent statusUpdateEvent =
-            new TezEvent(new TaskStatusUpdateEvent(
-                currentTask.getCounters(), currentTask.getProgress()),
-                new EventMetaData(EventProducerConsumerType.SYSTEM,
-                    currentTask.getVertexName(), "",
-                    currentTask.getTaskAttemptID()));
-        TezEvent taskAttemptFailedEvent =
-            new TezEvent(new TaskAttemptFailedEvent(diagnostics),
-                sourceInfo);
-        try {
-          // Not setting taskComplete - since the main loop responsible for cleanup doesn't have
-          // control yet. Getting control depends on whether the I/P/O returns correctly after
-          // reporting an error.
-          heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
-        } catch (Throwable t) {
-          LOG.fatal("Failed to communicate task attempt failure to AM via"
-              + " umbilical", t);
-          if (t instanceof Error) {
-            LOG.error("Exception of type Error. Exiting now", t);
-            ExitUtil.terminate(-1, t);
-          }
-          // FIXME NEWTEZ maybe send a container failed event to AM?
-          // Irrecoverable error unless heartbeat sync can be re-established
-          heartbeatErrorException = t;
-          heartbeatError.set(true);
-          heartbeatThread.interrupt();
-        }
-      }
-
-      @Override
-      public boolean canCommit(TezTaskAttemptID taskAttemptID)
-          throws IOException {
-        return umbilical.canCommit(taskAttemptID);
-      }
-    };
-
-    // report non-pid to application master
-    String pid = System.getenv().get("JVM_PID");
-    
-    LOG.info("PID, containerIdentifier: " + pid + ", " + containerIdentifier);
-    
-    ContainerTask containerTask = null;
-    UserGroupInformation childUGI = null;
-    ContainerContext containerContext = new ContainerContext(
-        containerIdentifier, pid);
-    int getTaskMaxSleepTime = defaultConf.getInt(
-        TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
-        TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
-    int taskCount = 0;
-    TezVertexID lastVertexId = null;
-    EventMetaData currentSourceInfo = null;
-    try {
-      String loggerAddend = "";
-      while (true) {
-        // poll for new task
-        if (taskCount > 0) {
-          TezUtils.updateLoggers(loggerAddend);
-        }
-        boolean isNewGetTask = true;
-        long getTaskPollStartTime = System.currentTimeMillis();
-        long nextGetTaskPrintTime = getTaskPollStartTime + 2000l;
-        for (int idle = 0; null == containerTask; ++idle) {
-          if (!isNewGetTask) { // Don't sleep on the first iteration.
-            long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
-            if (sleepTimeMilliSecs + System.currentTimeMillis() > nextGetTaskPrintTime) {
-              LOG.info("Sleeping for "
-                  + sleepTimeMilliSecs
-                  + "ms before retrying getTask again. Got null now. "
-                  + "Next getTask sleep message after 2s");
-              nextGetTaskPrintTime = System.currentTimeMillis() + sleepTimeMilliSecs + 2000l;
-            }
-            MILLISECONDS.sleep(sleepTimeMilliSecs);
-          } else {
-            LOG.info("Attempting to fetch new task");
-          }
-          isNewGetTask = false;
-          containerTask = umbilical.getTask(containerContext);
-        }
-        LOG.info("Got TaskUpdate: "
-            + (System.currentTimeMillis() - getTaskPollStartTime)
-            + " ms after starting to poll."
-            + " TaskInfo: shouldDie: " + containerTask.shouldDie()
-            + (containerTask.shouldDie() == true ? "" : ", currentTaskAttemptId: "
-                + containerTask.getTaskSpec().getTaskAttemptID()));
-        if (containerTask.shouldDie()) {
-          return;
-        }
-        taskCount++;
-
-        // Reset FileSystem statistics
-        FileSystem.clearStatistics();
-
-        // Re-use the UGI only if the Credentials have not changed.
-        if (containerTask.haveCredentialsChanged()) {
-          LOG.info("Refreshing UGI since Credentials have changed");
-          Credentials taskCreds = containerTask.getCredentials();
-          if (taskCreds != null) {
-            LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
-                + taskCreds.numberOfSecretKeys());
-            childUGI = UserGroupInformation.createRemoteUser(System
-                .getenv(ApplicationConstants.Environment.USER.toString()));
-            childUGI.addCredentials(containerTask.getCredentials());
-          } else {
-            LOG.info("Not loading any credentials, since no credentials provided");
-          }
-        }
-
-        Map<String, TezLocalResource> additionalResources = containerTask.getAdditionalResources();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Additional Resources added to container: " + additionalResources);
-        }
-
-        LOG.info("Localizing additional local resources for Task : " + additionalResources);
-        List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
-            Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
-              @Override
-              public URI apply(TezLocalResource input) {
-                return input.getUri();
-              }
-            }), defaultConf);
-        RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
-
-        LOG.info("Done localizing additional resources");
-        final TaskSpec taskSpec = containerTask.getTaskSpec();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("New container task context:"
-              + taskSpec.toString());
-        }
-
-        try {
-          taskLock.writeLock().lock();
-          currentTaskAttemptID = taskSpec.getTaskAttemptID();
-          TezVertexID newVertexId =
-              currentTaskAttemptID.getTaskID().getVertexID();
-          currentTaskComplete.set(false);
-
-          if (lastVertexId != null) {
-            if (!lastVertexId.equals(newVertexId)) {
-              objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
-            }
-            if (!lastVertexId.getDAGId().equals(newVertexId.getDAGId())) {
-              objectRegistry.clearCache(ObjectLifeCycle.DAG);
-              startedInputsMap = HashMultimap.create();
-            }
-          }
-          lastVertexId = newVertexId;
-          TezUtils.updateLoggers(currentTaskAttemptID.toString());
-          loggerAddend = currentTaskAttemptID.toString() + "_post";
-          
-          currentTask = createLogicalTask(attemptNumber, taskSpec,
-              defaultConf, tezUmbilical, serviceConsumerMetadata);
-          
-          taskNonOobHeartbeatCounter = 0;
-          nextHeartbeatNumToLog = (Math.max(1,
-              (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
-                  : (float) amPollInterval))));
-        } finally {
-          taskLock.writeLock().unlock();
-        }
-
-        final EventMetaData sourceInfo = new EventMetaData(
-            EventProducerConsumerType.SYSTEM,
-            taskSpec.getVertexName(), "", currentTaskAttemptID);
-        currentSourceInfo = sourceInfo;
-
-        // TODO Initiate Java VM metrics
-        // JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
-
-        childUGI.doAs(new PrivilegedExceptionAction<Object>() {
-          @Override
-          public Object run() throws Exception {
-            try {
-              setFileSystemWorkingDir(defaultConf);
-              LOG.info("Initializing task"
-                  + ", taskAttemptId=" + currentTaskAttemptID);
-              currentTask.initialize();
-              if (!currentTask.hadFatalError()) {
-                LOG.info("Running task"
-                    + ", taskAttemptId=" + currentTaskAttemptID);
-                currentTask.run();
-                LOG.info("Closing task"
-                    + ", taskAttemptId=" + currentTaskAttemptID);
-                currentTask.close();
-              }
-              LOG.info("Task completed"
-                  + ", taskAttemptId=" + currentTaskAttemptID
-                  + ", fatalErrorOccurred=" + currentTask.hadFatalError());
-              // Mark taskComplete - irrespective of failure, framework has control from this point.
-              currentTaskComplete.set(true);
-              // TODONEWTEZ Should the container continue to run if the running task reported a fatal error ?
-              if (!currentTask.hadFatalError()) {
-                // Set counters in case of a successful task.
-                currentTask.setFrameworkCounters();
-                TezEvent statusUpdateEvent =
-                    new TezEvent(new TaskStatusUpdateEvent(
-                        currentTask.getCounters(), currentTask.getProgress()),
-                        new EventMetaData(EventProducerConsumerType.SYSTEM,
-                            currentTask.getVertexName(), "",
-                            currentTask.getTaskAttemptID()));
-                TezEvent taskCompletedEvent =
-                    new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
-                heartbeat(Arrays.asList(statusUpdateEvent, taskCompletedEvent));
-              } // Should the fatalError be reported ?
-            } finally {
-              currentTask.cleanup();
-            }
-            try {
-              taskLock.writeLock().lock();
-              currentTask = null;
-              currentTaskAttemptID = null;
-            } finally {
-              taskLock.writeLock().unlock();
-            }
-            return null;
-          }
-        });
-        FileSystem.closeAllForUGI(childUGI);
-        containerTask = null;
-        if (heartbeatError.get()) {
-          LOG.fatal("Breaking out of task loop, heartbeat error occurred",
-              heartbeatErrorException);
-          break;
-        }
-      }
-    } catch (FSError e) {
-      // Heartbeats controlled manually after this.
-      stopped.set(true);
-      heartbeatThread.interrupt();
-      LOG.fatal("FSError from child", e);
-      // TODO NEWTEZ this should be a container failed event?
-      try {
-        taskLock.readLock().lock();
-        if (currentTask != null && !currentTask.hadFatalError()) {
-          // TODO Is this of any use if the heartbeat thread is being interrupted first ?
-          // Prevent dup failure events
-          currentTask.setFrameworkCounters();
-          TezEvent statusUpdateEvent =
-              new TezEvent(new TaskStatusUpdateEvent(
-                  currentTask.getCounters(), currentTask.getProgress()),
-                  new EventMetaData(EventProducerConsumerType.SYSTEM,
-                      currentTask.getVertexName(), "",
-                      currentTask.getTaskAttemptID()));
-          currentTask.setFatalError(e, "FS Error in Child JVM");
-          TezEvent taskAttemptFailedEvent =
-              new TezEvent(new TaskAttemptFailedEvent(
-                  StringUtils.stringifyException(e)),
-                  currentSourceInfo);
-          currentTaskComplete.set(true);
-          heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
-        }
-      } finally {
-        taskLock.readLock().unlock();
-      }
-    } catch (Throwable throwable) {
-      // Heartbeats controlled manually after this.
-      if (throwable instanceof Error) {
-        LOG.error("Exception of type Error. Exiting now", throwable);
-        ExitUtil.terminate(-1, throwable);
-      }
-      stopped.set(true);
-      heartbeatThread.interrupt();
-      String cause = StringUtils.stringifyException(throwable);
-      LOG.fatal("Error running child : " + cause);
-      taskLock.readLock().lock();
-      try {
-        if (currentTask != null && !currentTask.hadFatalError()) {
-          // TODO Is this of any use if the heartbeat thread is being interrupted first ?
-          // Prevent dup failure events
-          currentTask.setFatalError(throwable, "Error in Child JVM");
-          currentTask.setFrameworkCounters();
-          TezEvent statusUpdateEvent =
-              new TezEvent(new TaskStatusUpdateEvent(
-                  currentTask.getCounters(), currentTask.getProgress()),
-                  new EventMetaData(EventProducerConsumerType.SYSTEM,
-                      currentTask.getVertexName(), "",
-                      currentTask.getTaskAttemptID()));
-          TezEvent taskAttemptFailedEvent =
-            new TezEvent(new TaskAttemptFailedEvent(cause),
-              currentSourceInfo);
-          currentTaskComplete.set(true);
-          heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
-        }
-      } finally {
-        taskLock.readLock().unlock();
-      }
-    } finally {
-      stopped.set(true);
-      heartbeatThread.interrupt();
-      RPC.stopProxy(umbilical);
-      DefaultMetricsSystem.shutdown();
-      // Shutting down log4j of the child-vm...
-      // This assumes that on return from Task.run()
-      // there is no more logging done.
-      LogManager.shutdown();
-    }
-  }
-
-  private static LogicalIOProcessorRuntimeTask createLogicalTask(int attemptNum,
-      TaskSpec taskSpec, Configuration conf, TezUmbilical tezUmbilical,
-      Map<String, ByteBuffer> serviceConsumerMetadata) throws IOException {
-
-    // FIXME TODONEWTEZ
-    conf.setBoolean("ipc.client.tcpnodelay", true);
-
-    String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
-    conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
-    LOG.info("LocalDirs for child: " + Arrays.toString(localDirs));
-
-    return new LogicalIOProcessorRuntimeTask(taskSpec, attemptNum, conf,
-        tezUmbilical, serviceConsumerMetadata, startedInputsMap);
-  }
-  
-  // TODONEWTEZ Is this really required ?
-  private static void setFileSystemWorkingDir(Configuration conf) throws IOException {
-    FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
-  }
-
-
-  private static Path getWorkingDirectory(Configuration conf) {
-    String name = conf.get(JobContext.WORKING_DIR);
-    if (name != null) {
-      return new Path(name);
-    } else {
-      try {
-        Path dir = FileSystem.get(conf).getWorkingDirectory();
-        conf.set(JobContext.WORKING_DIR, dir.toString());
-        return dir;
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
index f91a909..46e200e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
@@ -23,10 +23,10 @@ import java.util.List;
 import java.util.Vector;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.YarnTezDagChild;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.runtime.task.TezChild;
 
 public class TezRuntimeChildJVM {
 
@@ -90,7 +90,7 @@ public class TezRuntimeChildJVM {
     }
 
     // Add main class and its arguments
-    vargs.add(YarnTezDagChild.class.getName());  // main of Child
+    vargs.add(TezChild.class.getName());  // main of Child
 
     // pass TaskAttemptListener's address
     vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
new file mode 100644
index 0000000..a68c7c1
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+
+/**
+ * Responsible for communication between a running Container and the ApplicationMaster. The main
+ * functionality is to poll for new tasks.
+ * 
+ */
+public class ContainerReporter implements Callable<ContainerTask> {
+
+  private static final Logger LOG = Logger.getLogger(ContainerReporter.class);
+
+  private final TezTaskUmbilicalProtocol umbilical;
+  private final ContainerContext containerContext;
+  private final int getTaskMaxSleepTime;
+  private final long LOG_INTERVAL = 2000l;
+
+  private long nextGetTaskPrintTime;
+
+  ContainerReporter(TezTaskUmbilicalProtocol umbilical, ContainerContext containerContext,
+      int getTaskMaxSleepTime) {
+    this.umbilical = umbilical;
+    this.containerContext = containerContext;
+    this.getTaskMaxSleepTime = getTaskMaxSleepTime;
+  }
+
+  @Override
+  public ContainerTask call() throws Exception {
+    ContainerTask containerTask = null;
+    LOG.info("Attempting to fetch new task");
+    containerTask = umbilical.getTask(containerContext);
+    long getTaskPollStartTime = System.currentTimeMillis();
+    nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL;
+    for (int idle = 1; containerTask == null; idle++) {
+      long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
+      maybeLogSleepMessage(sleepTimeMilliSecs);
+      TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs);
+      containerTask = umbilical.getTask(containerContext);
+    }
+    LOG.info("Got TaskUpdate: "
+        + (System.currentTimeMillis() - getTaskPollStartTime)
+        + " ms after starting to poll."
+        + " TaskInfo: shouldDie: "
+        + containerTask.shouldDie()
+        + (containerTask.shouldDie() == true ? "" : ", currentTaskAttemptId: "
+            + containerTask.getTaskSpec().getTaskAttemptID()));
+    return containerTask;
+  }
+
+  private void maybeLogSleepMessage(long sleepTimeMilliSecs) {
+    long currentTime = System.currentTimeMillis();
+    if (sleepTimeMilliSecs + currentTime > nextGetTaskPrintTime) {
+      LOG.info("Sleeping for " + sleepTimeMilliSecs
+          + "ms before retrying getTask again. Got null now. "
+          + "Next getTask sleep message after " + LOG_INTERVAL + "ms");
+      nextGetTaskPrintTime = currentTime + sleepTimeMilliSecs + LOG_INTERVAL;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
new file mode 100644
index 0000000..8b888ff
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
@@ -0,0 +1,8 @@
+package org.apache.tez.runtime.task;
+
+public interface ErrorReporter {
+
+  void reportError(Throwable t);
+  
+  void shutdownRequested();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
new file mode 100644
index 0000000..d860a0b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.mortbay.log.Log;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Responsible for communication between tasks running in a Container and the ApplicationMaster.
+ * Takes care of sending heartbeats (regular and OOB) to the AM - to send generated events, and to
+ * retrieve events specific to this task.
+ * 
+ */
+public class TaskReporter {
+
+  private static final Logger LOG = Logger.getLogger(TaskReporter.class);
+
+  private final TezTaskUmbilicalProtocol umbilical;
+  private final long pollInterval;
+  private final long sendCounterInterval;
+  private final int maxEventsToGet;
+  private final AtomicLong requestCounter;
+  private final String containerIdStr;
+
+  private final ListeningExecutorService heartbeatExecutor;
+
+  @VisibleForTesting
+  HeartbeatCallable currentCallable;
+
+  public TaskReporter(TezTaskUmbilicalProtocol umbilical, long amPollInterval,
+      long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
+    this.umbilical = umbilical;
+    this.pollInterval = amPollInterval;
+    this.sendCounterInterval = sendCounterInterval;
+    this.maxEventsToGet = maxEventsToGet;
+    this.requestCounter = requestCounter;
+    this.containerIdStr = containerIdStr;
+    ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
+    heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
+  }
+
+  /**
+   * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc.
+   */
+  public synchronized void registerTask(LogicalIOProcessorRuntimeTask task,
+      ErrorReporter errorReporter) {
+    currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
+        maxEventsToGet, requestCounter, containerIdStr);
+    ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
+    Futures.addCallback(future, new HeartbeatCallback(errorReporter));
+  }
+
+  /**
+   * This method should always be invoked before setting up heartbeats for another task running in
+   * the same container.
+   */
+  public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
+    currentCallable.markComplete();
+    currentCallable = null;
+  }
+  
+  public void shutdown() {
+    heartbeatExecutor.shutdownNow();
+  }
+
+  private static class HeartbeatCallable implements Callable<Boolean> {
+
+    private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
+    private static final float LOG_COUNTER_BACKOFF = 1.3f;
+
+    private final LogicalIOProcessorRuntimeTask task;
+    private EventMetaData updateEventMetadata;
+
+    private final TezTaskUmbilicalProtocol umbilical;
+
+    private final long pollInterval;
+    private final long sendCounterInterval;
+    private final int maxEventsToGet;
+    private final String containerIdStr;
+
+    private final AtomicLong requestCounter;
+
+    private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>();
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition condition = lock.newCondition();
+
+    /*
+     * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
+     * log counters.
+     */
+    private int nonOobHeartbeatCounter = 0;
+    private int nextHeartbeatNumToLog = 0;
+    /*
+     * Tracks the last non-OOB heartbeat number at which counters were sent to the AM. 
+     */
+    private int prevCounterSendHeartbeatNum = 0;
+
+    public HeartbeatCallable(LogicalIOProcessorRuntimeTask task,
+        TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval,
+        int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
+
+      this.pollInterval = amPollInterval;
+      this.sendCounterInterval = sendCounterInterval;
+      this.maxEventsToGet = maxEventsToGet;
+      this.requestCounter = requestCounter;
+      this.containerIdStr = containerIdStr;
+
+      this.task = task;
+      this.umbilical = umbilical;
+      this.updateEventMetadata = new EventMetaData(EventProducerConsumerType.SYSTEM,
+          task.getVertexName(), "", task.getTaskAttemptID());
+
+      nextHeartbeatNumToLog = (Math.max(1,
+          (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
+              : (float) amPollInterval))));
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      // Heartbeat only for active tasks. Errors, etc will be reported directly.
+      while (!task.isTaskDone() && !task.hadFatalError()) {
+        boolean result = heartbeat(null);
+        if (!result) {
+          // AM sent a shouldDie=true
+          LOG.info("Asked to die via task heartbeat");
+          return false;
+        }
+        lock.lock();
+        try {
+          boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
+          if (!interrupted) {
+            nonOobHeartbeatCounter++;
+          }
+        } finally {
+          lock.unlock();
+        }
+      }
+      int pendingEventCount = eventsToSend.size();
+      if (pendingEventCount > 0) {
+        LOG.warn("Exiting TaskReporter therad with pending queue size=" + pendingEventCount);
+      }
+      return true;
+    }
+
+    /**
+     * @param eventsArg
+     * @return
+     * @throws IOException
+     *           indicates an RPC communication failure.
+     * @throws TezException
+     *           indicates an exception somewhere in the AM.
+     */
+    private synchronized boolean heartbeat(Collection<TezEvent> eventsArg) throws IOException,
+        TezException {
+
+      if (eventsArg != null) {
+        eventsToSend.addAll(eventsArg);
+      }
+
+      TezEvent updateEvent = null;
+      List<TezEvent> events = new ArrayList<TezEvent>();
+      eventsToSend.drainTo(events);
+
+      if (!task.isTaskDone() && !task.hadFatalError()) {
+        TezCounters counters = null;
+        /**
+         * Increasing the heartbeat interval can delay the delivery of events. Sending just updated
+         * records would save CPU in DAG AM, but certain counters are updated very frequently. Until
+         * real time decisions are made based on these counters, it can be sent once per second.
+         */
+        // Not completely accurate, since OOB heartbeats could go out.
+        if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
+          counters = task.getCounters();
+          prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
+        }
+        updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
+            updateEventMetadata);
+        events.add(updateEvent);
+      }
+
+      long requestId = requestCounter.incrementAndGet();
+      TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
+          task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
+      if (LOG.isDebugEnabled()) {
+        Log.debug("Sending heartbeat to AM, request=" + request);
+      }
+
+      maybeLogCounters();
+
+      TezHeartbeatResponse response = umbilical.heartbeat(request);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received heartbeat response from AM, response=" + response);
+      }
+
+      if (response.shouldDie()) {
+        LOG.info("Received should die response from AM");
+        return false;
+      }
+      if (response.getLastRequestId() != requestId) {
+        throw new TezException("AM and Task out of sync" + ", responseReqId="
+            + response.getLastRequestId() + ", expectedReqId=" + requestId);
+      }
+
+      // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
+      // are running using the same umbilical.
+      if (task.isTaskDone() || task.hadFatalError()) {
+        if (response.getEvents() != null && !response.getEvents().isEmpty()) {
+          LOG.warn("Current task already complete, Ignoring all event in"
+              + " heartbeat response, eventCount=" + response.getEvents().size());
+        }
+      } else {
+        if (response.getEvents() != null && !response.getEvents().isEmpty()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
+                + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
+          }
+          // This should ideally happen in a separate thread
+          task.handleEvents(response.getEvents());
+        }
+      }
+      return true;
+
+    }
+
+    public void markComplete() {
+      // Notify to clear pending events, if any.
+      lock.lock();
+      try {
+        condition.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    private void maybeLogCounters() {
+      if (LOG.isDebugEnabled()) {
+        if (nonOobHeartbeatCounter == nextHeartbeatNumToLog) {
+          LOG.debug("Counters: " + task.getCounters().toShortString());
+          nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
+        }
+      }
+    }
+
+    /**
+     * Sends out final events for task success.
+     * @param taskAttemptID
+     * @return
+     * @throws IOException
+     *           indicates an RPC communication failure.
+     * @throws TezException
+     *           indicates an exception somewhere in the AM.
+     */
+    private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
+      TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
+          task.getProgress()), updateEventMetadata);
+      TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
+          updateEventMetadata);
+      return heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent));
+    }
+
+    /**
+     * Sends out final events for task failure.
+     * @param taskAttemptID
+     * @param t
+     * @param diagnostics
+     * @param srcMeta
+     * @return
+     * @throws IOException
+     *           indicates an RPC communication failure.
+     * @throws TezException
+     *           indicates an exception somewhere in the AM.
+     */
+    private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+        EventMetaData srcMeta) throws IOException, TezException {
+      TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
+          task.getProgress()), updateEventMetadata);
+      if (diagnostics == null) {
+        diagnostics = StringUtils.stringifyException(t);
+      }
+      TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
+          srcMeta == null ? updateEventMetadata : srcMeta);
+      return heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
+    }
+
+    private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
+      if (events != null && !events.isEmpty()) {
+        eventsToSend.addAll(events);
+      }
+    }
+  }
+
+  private static class HeartbeatCallback implements FutureCallback<Boolean> {
+
+    private final ErrorReporter errorReporter;
+
+    HeartbeatCallback(ErrorReporter errorReporter) {
+      this.errorReporter = errorReporter;
+    }
+
+    @Override
+    public void onSuccess(Boolean result) {
+      if (result == false) {
+        errorReporter.shutdownRequested();
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      errorReporter.reportError(t);
+    }
+  }
+
+  public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
+    return currentCallable.taskSucceeded(taskAttemptID);
+  }
+
+  public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+      EventMetaData srcMeta) throws IOException, TezException {
+    return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
+  }
+
+  public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
+    currentCallable.addEvents(taskAttemptID, events);
+  }
+
+  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
+    return umbilical.canCommit(taskAttemptID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
new file mode 100644
index 0000000..9e0e523
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezLocalResource;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.RelocalizationUtils;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TezChild {
+
+  private static final Logger LOG = Logger.getLogger(TezChild.class);
+
+  private final Configuration defaultConf;
+  private final String containerIdString;
+  private final int appAttemptNumber;
+  private final InetSocketAddress address;
+  private final String[] localDirs;
+
+  private final AtomicLong heartbeatCounter = new AtomicLong(0);
+
+  private final int getTaskMaxSleepTime;
+  private final int amHeartbeatInterval;
+  private final long sendCounterInterval;
+  private final int maxEventsToGet;
+
+  private final ListeningExecutorService executor;
+  private final ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
+  private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+
+  private Multimap<String, String> startedInputsMap = HashMultimap.create();
+
+  private TaskReporter taskReporter;
+  private TezTaskUmbilicalProtocol umbilical;
+  private int taskCount = 0;
+  private TezVertexID lastVertexID;
+
+  public TezChild(Configuration conf, String host, int port, String containerIdentifier,
+      String tokenIdentifier, int appAttemptNumber, String[] localDirs) throws IOException,
+      InterruptedException {
+    this.defaultConf = conf;
+    this.containerIdString = containerIdentifier;
+    this.appAttemptNumber = appAttemptNumber;
+    this.localDirs = localDirs;
+
+    getTaskMaxSleepTime = defaultConf.getInt(
+        TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
+        TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
+
+    amHeartbeatInterval = defaultConf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+
+    sendCounterInterval = defaultConf.getLong(
+        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
+        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
+
+    maxEventsToGet = defaultConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+        TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
+
+    address = NetUtils.createSocketAddrForHost(host, port);
+
+    ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("TezChild").build());
+    this.executor = MoreExecutors.listeningDecorator(executor);
+
+    // Security framework already loaded the tokens into current ugi
+    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Executing with tokens:");
+      for (Token<?> token : credentials.getAllTokens()) {
+        LOG.debug(token);
+      }
+    }
+
+    UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
+    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+    SecurityUtil.setTokenService(jobToken, address);
+    taskOwner.addToken(jobToken);
+
+    serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+        ShuffleUtils.convertJobTokenToBytes(jobToken));
+
+    umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+      @Override
+      public TezTaskUmbilicalProtocol run() throws Exception {
+        return (TezTaskUmbilicalProtocol) RPC.getProxy(TezTaskUmbilicalProtocol.class,
+            TezTaskUmbilicalProtocol.versionID, address, defaultConf);
+      }
+    });
+  }
+
+  void run() throws IOException, InterruptedException, TezException {
+
+    ContainerContext containerContext = new ContainerContext(containerIdString);
+    ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext,
+        getTaskMaxSleepTime);
+
+    taskReporter = new TaskReporter(umbilical, amHeartbeatInterval,
+        sendCounterInterval, maxEventsToGet, heartbeatCounter, containerIdString);
+
+    UserGroupInformation childUGI = null;
+
+    while (true) {
+      if (taskCount > 0) {
+        TezUtils.updateLoggers("");
+      }
+      ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
+      ContainerTask containerTask = null;
+      try {
+        containerTask = getTaskFuture.get();
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        handleError(cause);
+        return;
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted while waiting for task to complete:"
+            + containerTask.getTaskSpec().getTaskAttemptID());
+        handleError(e);
+        return;
+      }
+      if (containerTask.shouldDie()) {
+        LOG.info("ContainerTask returned shouldDie=true, Exiting");
+        shutdown();
+      } else {
+        String loggerAddend = containerTask.getTaskSpec().getTaskAttemptID().toString();
+        taskCount++;
+        TezUtils.updateLoggers(loggerAddend);
+        FileSystem.clearStatistics();
+
+        childUGI = handleNewTaskCredentials(containerTask, childUGI);
+        handleNewTaskLocalResources(containerTask);
+        cleanupOnTaskChanged(containerTask);
+
+        // Execute the Actual Task
+        TezTaskRunner taskRunner = new TezTaskRunner(new TezConfiguration(defaultConf), childUGI,
+            localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber,
+            serviceConsumerMetadata, startedInputsMap, taskReporter, executor);
+        boolean shouldDie = false;
+        try {
+          shouldDie = !taskRunner.run();
+          if (shouldDie) {
+            LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+            shutdown();
+          }
+        } catch (IOException e) {
+          handleError(e);
+          return;
+        } catch (TezException e) {
+          handleError(e);
+          return;
+        } finally {
+          FileSystem.closeAllForUGI(childUGI);
+        }
+      }
+    }
+  }
+
+  /**
+   * Setup
+   * 
+   * @param containerTask
+   *          the new task specification. Must be a valid task
+   * @param childUGI
+   *          the old UGI instance being used
+   * @return
+   */
+  UserGroupInformation handleNewTaskCredentials(ContainerTask containerTask,
+      UserGroupInformation childUGI) {
+    // Re-use the UGI only if the Credentials have not changed.
+    Preconditions.checkState(containerTask.shouldDie() != true);
+    Preconditions.checkState(containerTask.getTaskSpec() != null);
+    if (containerTask.haveCredentialsChanged()) {
+      LOG.info("Refreshing UGI since Credentials have changed");
+      Credentials taskCreds = containerTask.getCredentials();
+      if (taskCreds != null) {
+        LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
+            + taskCreds.numberOfSecretKeys());
+        childUGI = UserGroupInformation.createRemoteUser(System
+            .getenv(ApplicationConstants.Environment.USER.toString()));
+        childUGI.addCredentials(containerTask.getCredentials());
+      } else {
+        LOG.info("Not loading any credentials, since no credentials provided");
+      }
+    }
+    return childUGI;
+  }
+
+  /**
+   * Handles any additional resources to be localized for the new task
+   * 
+   * @param containerTask
+   * @throws IOException
+   * @throws TezException
+   */
+  private void handleNewTaskLocalResources(ContainerTask containerTask) throws IOException,
+      TezException {
+    Map<String, TezLocalResource> additionalResources = containerTask.getAdditionalResources();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Additional Resources added to container: " + additionalResources);
+    }
+
+    LOG.info("Localizing additional local resources for Task : " + additionalResources);
+    List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
+        Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
+          @Override
+          public URI apply(TezLocalResource input) {
+            return input.getUri();
+          }
+        }), defaultConf);
+    RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
+
+    LOG.info("Done localizing additional resources");
+    final TaskSpec taskSpec = containerTask.getTaskSpec();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("New container task context:" + taskSpec.toString());
+    }
+  }
+
+  /**
+   * Cleans entries from the object registry, and resets the startedInputsMap if required
+   * 
+   * @param containerTask
+   *          the new task specification. Must be a valid task
+   */
+  private void cleanupOnTaskChanged(ContainerTask containerTask) {
+    Preconditions.checkState(containerTask.shouldDie() != true);
+    Preconditions.checkState(containerTask.getTaskSpec() != null);
+    TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getTaskID()
+        .getVertexID();
+    if (lastVertexID != null) {
+      if (!lastVertexID.equals(newVertexID)) {
+        objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
+      }
+      if (!lastVertexID.getDAGId().equals(newVertexID.getDAGId())) {
+        objectRegistry.clearCache(ObjectLifeCycle.DAG);
+        startedInputsMap = HashMultimap.create();
+      }
+    }
+    lastVertexID = newVertexID;
+  }
+
+  private void shutdown() {
+    executor.shutdownNow();
+    if (taskReporter != null) {
+      taskReporter.shutdown();
+    }
+    RPC.stopProxy(umbilical);
+    DefaultMetricsSystem.shutdown();
+    LogManager.shutdown();
+  }
+
+  public static void main(String[] args) throws IOException, InterruptedException, TezException {
+    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    LOG.info("TezChild starting");
+
+    final Configuration defaultConf = new Configuration();
+    // Pull in configuration specified for the session.
+    TezUtils.addUserSpecifiedTezConfiguration(defaultConf);
+    UserGroupInformation.setConfiguration(defaultConf);
+    Limits.setConfiguration(defaultConf);
+
+    assert args.length == 5;
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    final String containerIdentifier = args[2];
+    final String tokenIdentifier = args[3];
+    final int attemptNumber = Integer.parseInt(args[4]);
+    final String pid = System.getenv().get("JVM_PID");
+    final String[] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
+        .name()));
+    LOG.info("PID, containerIdentifier:  " + pid + ", " + containerIdentifier);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port
+          + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber
+          + " tokenIdentifier: " + tokenIdentifier);
+    }
+
+    // Should this be part of main - Metrics and ObjectRegistry. TezTask setup should be independent
+    // of this class. Leaving it here, till there's some entity representing a running JVM.
+    DefaultMetricsSystem.initialize("TezTask");
+
+    ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
+    @SuppressWarnings("unused")
+    Injector injector = Guice.createInjector(new ObjectRegistryModule(objectRegistry));
+
+    TezChild tezChild = new TezChild(defaultConf, host, port, containerIdentifier, tokenIdentifier,
+        attemptNumber, localDirs);
+
+    tezChild.run();
+  }
+
+  private void handleError(Throwable t) {
+    shutdown();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/acd0a46e/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
new file mode 100644
index 0000000..0e622f9
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+public class TezTaskRunner implements TezUmbilical, ErrorReporter {
+
+  private static final Logger LOG = Logger.getLogger(TezTaskRunner.class);
+
+  private final Configuration tezConf;
+  private final LogicalIOProcessorRuntimeTask task;
+  private final UserGroupInformation ugi;
+
+  private final TaskReporter taskReporter;
+  private final ListeningExecutorService executor;
+  private volatile ListenableFuture<Void> taskFuture;
+  private volatile Thread waitingThread;
+  private volatile Throwable firstException;
+
+  // Effectively a duplicate check, since hadFatalError does the same thing.
+  private final AtomicBoolean fatalErrorSent = new AtomicBoolean(false);
+  private final AtomicBoolean taskRunning;
+  private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
+  TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+      TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int appAttemptNumber,
+      Map<String, ByteBuffer> serviceConsumerMetadata, Multimap<String, String> startedInputsMap,
+      TaskReporter taskReporter, ListeningExecutorService executor) throws IOException {
+    this.tezConf = tezConf;
+    this.ugi = ugi;
+    this.tezConf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
+    this.taskReporter = taskReporter;
+    this.executor = executor;
+    task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, this,
+        serviceConsumerMetadata, startedInputsMap);
+    taskReporter.registerTask(task, this);
+    taskRunning = new AtomicBoolean(true);
+
+  }
+
+  /**
+   * @return false if a shutdown message was received during task execution
+   * @throws TezException
+   * @throws IOException
+   */
+  public boolean run() throws InterruptedException, IOException, TezException {
+    waitingThread = Thread.currentThread();
+    TaskRunnerCallable callable = new TaskRunnerCallable();
+    Throwable failureCause = null;
+    taskFuture = executor.submit(callable);
+    try {
+      taskFuture.get();
+
+      // Task could signal a fatal error and return control, or a failure while registering success.
+      failureCause = firstException;
+
+    } catch (InterruptedException e) {
+      LOG.info("Interrupted while waiting for task to complete. Interrupting task");
+      taskFuture.cancel(true);
+      if (shutdownRequested.get()) {
+        LOG.info("Shutdown requested... returning");
+        return false;
+      }
+      if (firstException != null) {
+        failureCause = firstException;
+      } else {
+        // Interrupted for some other reason.
+        failureCause = e;
+      }
+    } catch (ExecutionException e) {
+      // Exception thrown by the run() method itself.
+      Throwable cause = e.getCause();
+      if (cause instanceof FSError) {
+        // Not immediately fatal, this is an error reported by Hadoop FileSystem
+        failureCause = cause;
+      } else if (cause instanceof Error) {
+        LOG.error("Exception of type Error. Exiting now", cause);
+        ExitUtil.terminate(-1, cause);
+        // Effectively dead code. Must return something.
+        assert(false);
+        return false;
+      } else {
+        failureCause = cause;
+      }
+    } finally {
+      // Clear the interrupted status of the blocking thread, in case it is set after the
+      // InterruptedException was invoked.
+      taskReporter.unregisterTask(task.getTaskAttemptID());
+      Thread.interrupted();
+    }
+
+    if (failureCause != null) {
+      if (failureCause instanceof FSError) {
+        // Not immediately fatal, this is an error reported by Hadoop FileSystem
+        LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
+            failureCause);
+        throw (FSError) failureCause;
+      } else if (failureCause instanceof Error) {
+        LOG.error("Exception of type Error. Exiting now", failureCause);
+        ExitUtil.terminate(-1, failureCause);
+        // Effectively dead code. Must return something.
+        assert(false);
+        return false;
+      } else {
+        if (failureCause instanceof IOException) {
+          throw (IOException) failureCause;
+        } else if (failureCause instanceof TezException) {
+          throw (TezException) failureCause;
+        } else if (failureCause instanceof InterruptedException) {
+          throw (InterruptedException) failureCause;
+        } else {
+          throw new TezException(failureCause);
+        }
+      }
+    }
+    if (shutdownRequested.get()) {
+      LOG.info("Shutdown requested... returning");
+      return false;
+    }
+    return true;
+  }
+
+  private class TaskRunnerCallable implements Callable<Void> {
+
+    @Override
+    public Void call() throws Exception {
+      try {
+        return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            try {
+              LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+              task.initialize();
+              if (!Thread.currentThread().isInterrupted() && firstException == null) {
+                LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+                task.run();
+                LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+                task.close();
+                task.setFrameworkCounters();
+              }
+              LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID()
+                  + ", fatalErrorOccurred=" + (firstException != null));
+              if (firstException == null) {
+                try {
+                  taskReporter.taskSucceeded(task.getTaskAttemptID());
+                } catch (IOException e) {
+                  LOG.warn("Heartbeat failure caused by communication failure", e);
+                  maybeRegisterFirstException(e);
+                  // Falling off, since the runner thread checks for the registered exception.
+                } catch (TezException e) {
+                  LOG.warn("Heartbeat failure reported by AM", e);
+                  maybeRegisterFirstException(e);
+                  // Falling off, since the runner thread checks for the registered exception.
+                }
+              }
+              return null;
+            } catch (Throwable cause) {
+              if (cause instanceof FSError) {
+                // Not immediately fatal, this is an error reported by Hadoop FileSystem
+                maybeRegisterFirstException(cause);
+                LOG.info("Encountered an FSError while executing task: " + task.getTaskAttemptID(),
+                    cause);
+                try {
+                  sendFailure(cause, "FS Error in Child JVM");
+                } catch (Exception ignored) {
+                  // Ignored since another cause is already known
+                  LOG.info(
+                      "Ignoring the following exception since a previous exception is already registered",
+                      ignored);
+                }
+                throw (FSError) cause;
+              } else if (cause instanceof Error) {
+                LOG.error("Exception of type Error. Exiting now", cause);
+                ExitUtil.terminate(-1, cause);
+              } else {
+                if (cause instanceof UndeclaredThrowableException) {
+                  cause = ((UndeclaredThrowableException) cause).getCause();
+                }
+                maybeRegisterFirstException(cause);
+                LOG.info("Encounted an error while executing task: " + task.getTaskAttemptID(),
+                    cause);
+                try {
+                  sendFailure(cause, "Failure while running task");
+                } catch (Exception ignored) {
+                  // Ignored since another cause is already known
+                  LOG.info(
+                      "Ignoring the following exception since a previous exception is already registered",
+                      ignored);
+                }
+                if (cause instanceof IOException) {
+                  throw (IOException) cause;
+                } else if (cause instanceof TezException) {
+                  throw (TezException) cause;
+                } else {
+                  throw new TezException(cause);
+                }
+              }
+            } finally {
+              task.cleanup();
+            }
+            return null;
+          }
+        });
+      } finally {
+        taskRunning.set(false);
+      }
+    }
+  }
+
+  private void sendFailure(Throwable t, String message) throws IOException, TezException {
+    if (!fatalErrorSent.getAndSet(true)) {
+      task.setFatalError(t, message);
+      task.setFrameworkCounters();
+      try {
+        taskReporter.taskFailed(task.getTaskAttemptID(), t, message, null);
+      } catch (IOException e) {
+        // A failure reason already exists, Comm error just logged.
+        LOG.warn("Heartbeat failure caused by communication failure", e);
+        throw e;
+      } catch (TezException e) {
+        // A failure reason already exists, Comm error just logged.
+        LOG.warn("Heartbeat failure reported by AM", e);
+        throw e;
+      }
+    } else {
+      LOG.warn("Ignoring fatal error since another error has already been reported", t);
+    }
+  }
+
+  @Override
+  public void addEvents(Collection<TezEvent> events) {
+    if (taskRunning.get()) {
+      taskReporter.addEvents(task.getTaskAttemptID(), events);
+    }
+  }
+
+  @Override
+  public synchronized void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t,
+      String message, EventMetaData sourceInfo) {
+    // This can be called before a task throws an exception or after it.
+    // If called before a task throws an exception
+    // - ensure a heartbeat is sent with the diagnostics, and sent only once.
+    // - interrupt the waiting thread, and make it throw the reported error.
+    // If called after a task throws an exception, the waiting task has already returned, no point
+    // interrupting it.
+    // This case can be effectively ignored (log), as long as the run() method ends up throwing the
+    // exception.
+    //
+    //
+    if (!fatalErrorSent.getAndSet(true)) {
+      maybeRegisterFirstException(t);
+      try {
+        taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo);
+      } catch (IOException e) {
+        // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
+        // occurred earlier.
+        LOG.warn("Heartbeat failure caused by communication failure", e);
+      } catch (TezException e) {
+        // HeartbeatFailed. Don't need to propagate the heartbeat exception since a task exception
+        // occurred earlier.
+        LOG.warn("Heartbeat failure reported by AM", e);
+      } finally {
+        // Wake up the waiting thread so that it can return control
+        waitingThread.interrupt();
+      }
+    }
+  }
+
+  @Override
+  public boolean canCommit(TezTaskAttemptID taskAttemptID) {
+    if (taskRunning.get()) {
+      try {
+        return taskReporter.canCommit(taskAttemptID);
+      } catch (IOException e) {
+        LOG.warn("Communication failure while trying to commit", e);
+        maybeRegisterFirstException(e);
+        waitingThread.interrupt();
+        // Not informing the task since it will be interrupted.
+        // TODO: Should this be sent to the task as well, current Processors, etc do not handle
+        // interrupts very well.
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public synchronized void reportError(Throwable t) {
+    if (t instanceof Error) {
+      LOG.error("Exception of type Error during heartbeat, Exiting Now");
+      ExitUtil.terminate(-1, t);
+    } else if (taskRunning.get()) {
+      LOG.error("TaskReporter reported error", t);
+      maybeRegisterFirstException(t);
+      waitingThread.interrupt();
+      // A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
+      // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
+      // method does not throw an exception, in which case task success is registered with the AM.
+      // Leave this handling to the next getTask / actual task.
+    } else {
+      LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
+          + " is already complete");
+    }
+  }
+
+  @Override
+  public void shutdownRequested() {
+    shutdownRequested.set(true);
+    waitingThread.interrupt();
+  }
+
+  private String getTaskDiagnosticsString(Throwable t, String message) {
+    String diagnostics;
+    if (t != null && message != null) {
+      diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t) + ", errorMessage="
+          + message;
+    } else if (t == null && message == null) {
+      diagnostics = "Unknown error";
+    } else {
+      diagnostics = t != null ? "exceptionThrown=" + StringUtils.stringifyException(t)
+          : " errorMessage=" + message;
+    }
+    return diagnostics;
+  }
+
+  private synchronized void maybeRegisterFirstException(Throwable t) {
+    if (firstException == null) {
+      firstException = t;
+    }
+  }
+
+}