You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/09/15 22:21:26 UTC

[2/2] tez git commit: TEZ-2774. Improvements and cleanup of logging for the AM and parts of the runtime. Contributed by Siddharth Seth and Bikas Saha.

TEZ-2774. Improvements and cleanup of logging for the AM and parts of
the runtime. Contributed by Siddharth Seth and Bikas Saha.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f785ce8d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f785ce8d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f785ce8d

Branch: refs/heads/master
Commit: f785ce8d8653a469c8c6e6a9bbcfcff40c6e1289
Parents: d93bdc7
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Sep 15 13:20:37 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Sep 15 13:20:37 2015 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/AsyncDispatcher.java  |  14 +-
 .../tez/common/AsyncDispatcherConcurrent.java   |   9 +-
 .../org/apache/tez/common/TezUtilsInternal.java |  14 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  27 +++-
 .../tez/dag/app/TaskCommunicatorManager.java    |   7 +-
 .../app/dag/RootInputInitializerManager.java    |   2 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   2 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  14 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  34 +++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 128 +++++++++---------
 .../app/launcher/TezContainerLauncherImpl.java  |   8 +-
 .../tez/dag/app/rm/TaskSchedulerManager.java    |  19 +--
 .../dag/app/rm/YarnTaskSchedulerService.java    | 132 +++++++++++--------
 .../app/rm/container/AMContainerHelpers.java    |  10 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  18 +--
 .../dag/app/rm/node/PerSourceNodeTracker.java   |   2 +
 .../tez/dag/history/HistoryEventHandler.java    |   8 +-
 .../events/TaskAttemptFinishedEvent.java        |  12 +-
 .../history/events/TaskAttemptStartedEvent.java |   4 +-
 .../impl/SimpleHistoryLoggingService.java       |   4 +-
 .../dag/history/recovery/RecoveryService.java   |   8 +-
 .../resources/tez-container-log4j.properties    |   2 +-
 .../mapreduce/committer/MROutputCommitter.java  |   3 +-
 .../common/MRInputAMSplitGenerator.java         |  31 ++---
 .../common/MRInputSplitDistributor.java         |   7 +-
 .../tez/mapreduce/hadoop/MRInputHelpers.java    |  14 +-
 .../tez/mapreduce/partition/MRPartitioner.java  |  18 ++-
 .../logging/ats/ATSHistoryLoggingService.java   |   9 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  50 ++++---
 .../runtime/api/impl/TezInputContextImpl.java   |   4 +-
 .../runtime/api/impl/TezOutputContextImpl.java  |   4 +-
 .../api/impl/TezProcessorContextImpl.java       |   4 +-
 .../common/resources/MemoryDistributor.java     |  76 +++++++++--
 .../tez/runtime/metrics/TaskCounterUpdater.java |   4 +-
 .../tez/runtime/task/ContainerReporter.java     |   2 +-
 .../org/apache/tez/runtime/task/TezChild.java   |  38 +++---
 .../runtime/library/common/TezRuntimeUtils.java |   1 -
 .../WeightedScalingMemoryDistributor.java       |   8 +-
 .../tez/mapreduce/examples/RPCLoadGen.java      |   2 -
 39 files changed, 449 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index 4319f4f..159ccd9 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -130,7 +130,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
   @Override
   protected void serviceStart() throws Exception {
     eventHandlingThread = new Thread(createThread());
-    eventHandlingThread.setName("Dispatcher thread: " + name);
+    eventHandlingThread.setName("Dispatcher thread {" + name + "}");
     eventHandlingThread.start();
     
     //start all the components
@@ -211,7 +211,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
 
   private void checkForExistingConcurrentDispatcher(Class<? extends Enum> eventType) {
     AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(eventType);
-    Preconditions.checkState(concurrentDispatcher == null, 
+    Preconditions.checkState(concurrentDispatcher == null,
         "Multiple concurrent dispatchers cannot be registered for: " + eventType.getName());
   }
   
@@ -259,7 +259,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+    LOG.info(
+          "Registering " + eventType + " for independent dispatch using: " + handler.getClass());
     AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName);
     dispatcher.register(eventType, handler);
     eventDispatchers.put(eventType, dispatcher);
@@ -272,7 +273,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
+    LOG.info(
+          "Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
     AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
     dispatcher.register(eventType, handler);
     concurrentEventDispatchers.put(eventType, dispatcher);
@@ -286,8 +288,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
-        + handler.getClass());
+    LOG.info("Registering " + eventType + " with existing concurrent dispatch using: "
+          + handler.getClass());
     dispatcher.register(eventType, handler);
     concurrentEventDispatchers.put(eventType, dispatcher);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
index d19bf9e..321ea8b 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
@@ -136,7 +136,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
   @Override
   protected void serviceStart() throws Exception {
     execService = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat("Dispatcher [" + this.name + "] #%d").build());
+        .setNameFormat("Dispatcher {" + this.name + "} #%d").build());
     for (int i=0; i<numThreads; ++i) {
       eventQueues.add(new LinkedBlockingQueue<Event>());
     }
@@ -215,7 +215,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
 
   private void checkForExistingDispatcher(Class<? extends Enum> eventType) {
     AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(eventType);
-    Preconditions.checkState(registeredDispatcher == null, 
+    Preconditions.checkState(registeredDispatcher == null,
         "Multiple dispatchers cannot be registered for: " + eventType.getName());
   }
 
@@ -263,7 +263,8 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+    LOG.info(
+          "Registering " + eventType + " for independent dispatch using: " + handler.getClass());
     AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
     dispatcher.register(eventType, handler);
     eventDispatchers.put(eventType, dispatcher);
@@ -278,7 +279,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
     LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
-        + handler.getClass());
+          + handler.getClass());
     dispatcher.register(eventType, handler);
     eventDispatchers.put(eventType, dispatcher);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index d6ef901..c2a50f5 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -109,13 +109,10 @@ public class TezUtilsInternal {
 
 
   public static byte[] compressBytes(byte[] inBytes) throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     byte[] compressed = compressBytesInflateDeflate(inBytes);
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
           + ", CompressTime: " + sw.elapsedMillis());
     }
@@ -123,13 +120,10 @@ public class TezUtilsInternal {
   }
 
   public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
           + ", UncompressTimeTaken: " + sw.elapsedMillis());
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index fee13c1..c713435 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -467,7 +467,6 @@ public class DAGAppMaster extends AbstractService {
 
     // Prepare the TaskAttemptListener server for authentication of Containers
     // TaskAttemptListener gets the information via jobTokenSecretManager.
-    LOG.info("Adding session token to jobTokenSecretManager for application");
     jobTokenSecretManager.addTokenForJob(
         appAttemptID.getApplicationId().toString(), sessionToken);
 
@@ -495,8 +494,11 @@ public class DAGAppMaster extends AbstractService {
     dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
     dispatcher.register(DAGEventType.class, dagEventDispatcher);
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
-    if (!conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
-        TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT)) {
+    boolean useConcurrentDispatcher =
+        conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
+            TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT);
+    LOG.info("Using concurrent dispatcher: " + useConcurrentDispatcher);
+    if (!useConcurrentDispatcher) {
       dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
       dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
     } else {
@@ -560,7 +562,7 @@ public class DAGAppMaster extends AbstractService {
     currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
         appAttemptID.getAttemptId());
     if (LOG.isDebugEnabled()) {
-      LOG.info("Stage directory information for AppAttemptId :" + this.appAttemptID
+      LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID
           + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
           + " recoveryAttemptDir :" + currentRecoveryDataDir);
     }
@@ -926,7 +928,7 @@ public class DAGAppMaster extends AbstractService {
 
     try {
       if (LOG.isDebugEnabled()) {
-        LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString()
+        LOG.debug("JSON dump for submitted DAG, dagId=" + dagId.toString()
             + ", json="
             + DAGUtils.generateSimpleJSONPlan(dagPB).toString());
       }
@@ -2102,6 +2104,7 @@ public class DAGAppMaster extends AbstractService {
   public static void main(String[] args) {
     try {
       Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+      final String pid = System.getenv().get("JVM_PID");
       String containerIdStr =
           System.getenv(Environment.CONTAINER_ID.name());
       String nodeHostString = System.getenv(Environment.NM_HOST.name());
@@ -2141,6 +2144,18 @@ public class DAGAppMaster extends AbstractService {
           false, "Run Tez Application Master in Session mode");
 
       CommandLine cliParser = new GnuParser().parse(opts, args);
+      boolean sessionModeCliOption = cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
+
+      LOG.info("Creating DAGAppMaster for "
+          + "applicationId=" + applicationAttemptId.getApplicationId()
+          + ", attemptNum=" + applicationAttemptId.getAttemptId()
+          + ", AMContainerId=" + containerId
+          + ", jvmPid=" + pid
+          + ", userFromEnv=" + jobUserName
+          + ", cliSessionOption=" + sessionModeCliOption
+          + ", pwd=" + System.getenv(Environment.PWD.name())
+          + ", localDirs=" + System.getenv(Environment.LOCAL_DIRS.name())
+          + ", logDirs=" + System.getenv(Environment.LOG_DIRS.name()));
 
       // TODO Does this really need to be a YarnConfiguration ?
       Configuration conf = new Configuration(new YarnConfiguration());
@@ -2161,7 +2176,7 @@ public class DAGAppMaster extends AbstractService {
           new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
               Integer.parseInt(nodePortString),
               Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime,
-              cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION),
+              sessionModeCliOption,
               System.getenv(Environment.PWD.name()),
               TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())),
               TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())),

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 2cc6ae2..0bc02dc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -155,13 +155,13 @@ public class TaskCommunicatorManager extends AbstractService implements
 
   @VisibleForTesting
   TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
-    LOG.info("Using Default Task Communicator");
+    LOG.info("Creating Default Task Communicator");
     return new TezTaskCommunicatorImpl(taskCommunicatorContext);
   }
 
   @VisibleForTesting
   TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
-    LOG.info("Using Default Local Task Communicator");
+    LOG.info("Creating Default Local Task Communicator");
     return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext);
   }
 
@@ -169,7 +169,7 @@ public class TaskCommunicatorManager extends AbstractService implements
   TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
                                                 NamedEntityDescriptor taskCommDescriptor)
                                                     throws TezException {
-    LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
+    LOG.info("Creating TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
         taskCommDescriptor.getClassName());
     Class<? extends TaskCommunicator> taskCommClazz =
         (Class<? extends TaskCommunicator>) ReflectionUtils
@@ -322,7 +322,6 @@ public class TaskCommunicatorManager extends AbstractService implements
    */
 //  @Override
   public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
-    LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
     // An attempt is asking if it can commit its output. This can be decided
     // only by the task which is managing the multiple attempts. So redirect the
     // request there.

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 4a8a286..13128f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -100,7 +100,7 @@ public class RootInputInitializerManager {
     this.vertex = vertex;
     this.eventHandler = appContext.getEventHandler();
     this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("InputInitializer [" + this.vertex.getName() + "] #%d").build());
+        .setDaemon(true).setNameFormat("InputInitializer {" + this.vertex.getName() + "} #%d").build());
     this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     this.dagUgi = dagUgi;
     this.entityStateTracker = stateTracker;

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index c5a3c35..d2801e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -182,6 +182,8 @@ public interface Vertex extends Comparable<Vertex> {
 
   public Configuration getConf();
 
+  public boolean isSpeculationEnabled();
+
   public int getTaskSchedulerIdentifier();
   public int getContainerLauncherIdentifier();
   public int getTaskCommunicatorIdentifier();

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index abcd98d..2f228bd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -782,10 +782,12 @@ public class TaskAttemptImpl implements TaskAttempt,
             );
       }
       if (oldState != getInternalState()) {
-          LOG.info(attemptId + " TaskAttempt Transitioned from "
-           + oldState + " to "
-           + getInternalState() + " due to event "
-           + event.getType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(attemptId + " TaskAttempt Transitioned from "
+              + oldState + " to "
+              + getInternalState() + " due to event "
+              + event.getType());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -1116,7 +1118,9 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskSpec remoteTaskSpec;
       try {
         remoteTaskSpec = ta.createRemoteTaskSpec();
-        LOG.info("remoteTaskSpec:" + remoteTaskSpec);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("remoteTaskSpec:" + remoteTaskSpec);
+        }
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
         LOG.error(msg, e);

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4d449d4..2f304c8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -497,7 +497,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         int toEventId = actualMax + fromEventId;
         events = new ArrayList<TezEvent>(tezEventsForTaskAttempts.subList(fromEventId, toEventId));
         LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
-            + "-" + toEventId + ")");
+            + "-" + toEventId + ").");
         // currently not modifying the events so that we dont have to create
         // copies of events. e.g. if we have to set taskAttemptId into the TezEvent
         // destination metadata then we will need to create a copy of the TezEvent
@@ -756,12 +756,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   public boolean canCommit(TezTaskAttemptID taskAttemptID) {
     writeLock.lock();
     try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Commit go/no-go request from " + taskAttemptID);
+      }
       TaskState state = getState();
       if (state == TaskState.SCHEDULED) {
         // the actual running task ran and is done and asking for commit. we are still stuck 
         // in the scheduled state which indicates a backlog in event processing. lets wait for the
         // backlog to clear. returning false will make the attempt come back to us.
-        LOG.debug("Event processing delay. "
+        LOG.info(
+            "Event processing delay. "
             + "Attempt committing before state machine transitioned to running : Task {}", taskId);
         return false;
       }
@@ -792,7 +796,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         }
       } else {
         if (commitAttempt.equals(taskAttemptID)) {
-          LOG.info(taskAttemptID + " given a go for committing the task output.");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(taskAttemptID + " already given a go for committing the task output.");
+          }
           return true;
         }
         // Don't think this can be a pluggable decision, so simply raise an
@@ -800,9 +806,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         // Wait for commit attempt to succeed. Dont kill this. If commit
         // attempt fails then choose a different committer. When commit attempt
         // succeeds then this and others will be killed
-        LOG.info(commitAttempt
-            + " is current committer. Commit waiting for:  "
-            + taskAttemptID);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(commitAttempt + " is current committer. Commit waiting for:  " + taskAttemptID);
+        }
         return false;
       }
 
@@ -810,7 +816,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       writeLock.unlock();
     }
   }
-  
+
   TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
@@ -895,9 +901,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         internalError(event.getType());
       }
       if (oldState != getInternalState()) {
-        LOG.info(taskId + " Task Transitioned from " + oldState + " to "
-            + getInternalState() + " due to event "
-            + event.getType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
+              + getInternalState() + " due to event "
+              + event.getType());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -1108,7 +1116,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             //  other reasons.
             !attempt.isFinished()) {
           LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " +
-            task.successfulAttempt + " has succeeded");
+              task.successfulAttempt + " has succeeded");
           String diagnostics = null;
           TaskAttemptTerminationCause errCause = null;
           if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
@@ -1466,7 +1474,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       } else {
         // nothing to do
         LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " +
-                 task.successfulAttempt + " is already successful");
+            task.successfulAttempt + " is already successful");
         return TaskStateInternal.SUCCEEDED;
       }
     }
@@ -1509,7 +1517,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) {
     if (commitAttempt != null && commitAttempt.equals(attempt.getID())) {
-      LOG.info("Removing commit attempt: " + commitAttempt);
+      LOG.info("Unsetting commit attempt: " + commitAttempt + " since attempt is being killed");
       commitAttempt = null;
     }
     if (attempt != null && !attempt.isFinished()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 946ec19..c9b4205 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -913,6 +913,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     this.clock = clock;
     this.appContext = appContext;
     this.commitVertexOutputs = commitVertexOutputs;
+    this.logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
 
     this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
     this.taskHeartbeatHandler = thh;
@@ -971,6 +972,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
     this.containerContext = new ContainerContext(this.localResources,
         appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this);
+    LOG.info("Default container context for " + logIdentifier + "=" + containerContext + ", Default Resources=" + this.taskResource);
 
     if (vertexPlan.getInputsCount() > 0) {
       setAdditionalInputs(vertexPlan.getInputsList());
@@ -993,7 +995,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       speculator = new LegacySpeculator(vertexConf, getAppContext(), this);
     }
     
-    logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
+
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
 
@@ -1033,16 +1035,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       }
     }
 
-    LOG.info("Vertex: " + logIdentifier + " configured with TaskScheduler=" + taskSchedulerName +
-        ", ContainerLauncher=" + containerLauncherName + ", TaskComm=" + taskCommName);
-
-    taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName);
-    taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName);
-    containerLauncherIdentifier = appContext.getContainerLauncherIdentifier(containerLauncherName);
-
-    Preconditions.checkNotNull(taskSchedulerIdentifier, "Unknown taskScheduler: " + taskSchedulerName);
-    Preconditions.checkNotNull(taskCommunicatorIdentifier, "Unknown taskCommunicator: " + containerLauncherName);
-    Preconditions.checkNotNull(containerLauncherIdentifier, "Unknown containerLauncher: " + taskCommName);
+    try {
+      taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName);
+    } catch (Exception e) {
+      LOG.error("Failed to get index for taskScheduler: " + taskSchedulerName);
+      throw e;
+    }
+    try {
+      taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName);
+    } catch (Exception e) {
+      LOG.error("Failed to get index for taskCommunicator: " + taskCommName);
+      throw e;
+    }
+    try {
+      containerLauncherIdentifier =
+          appContext.getContainerLauncherIdentifier(containerLauncherName);
+    } catch (Exception e) {
+      LOG.error("Failed to get index for containerLauncher: " + containerLauncherName);
+      throw e;
+    }
 
     StringBuilder sb = new StringBuilder();
     sb.append("Running vertex: ").append(logIdentifier).append(" : ")
@@ -1076,7 +1087,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return this.taskCommunicatorIdentifier;
   }
 
-  private boolean isSpeculationEnabled() {
+  @Override
+  public boolean isSpeculationEnabled() {
     return isSpeculationEnabled;
   }
 
@@ -2110,29 +2122,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
 
+  private static String constructCheckTasksForCompletionLog(VertexImpl vertex) {
+    String logLine = vertex.logIdentifier
+        + ", tasks=" + vertex.numTasks
+        + ", failed=" + vertex.failedTaskCount
+        + ", killed=" + vertex.killedTaskCount
+        + ", success=" + vertex.succeededTaskCount
+        + ", completed=" + vertex.completedTaskCount
+        + ", commits=" + vertex.commitFutures.size()
+        + ", err=" + vertex.terminationCause;
+    return logLine;
+  }
+
   // triggered by task_complete
   static VertexState checkTasksForCompletion(final VertexImpl vertex) {
-
-    LOG.info("Checking tasks for vertex completion for "
-        + vertex.logIdentifier
-        + ", numTasks=" + vertex.numTasks
-        + ", failedTaskCount=" + vertex.failedTaskCount
-        + ", killedTaskCount=" + vertex.killedTaskCount
-        + ", successfulTaskCount=" + vertex.succeededTaskCount
-        + ", completedTaskCount=" + vertex.completedTaskCount
-        + ", commitInProgress=" + vertex.commitFutures.size()
-        + ", terminationCause=" + vertex.terminationCause);
-
+    // this log helps quickly count the completion count for a vertex.
+    // grepping and counting for attempts and handling re-tries is time consuming
+    LOG.info("Task Completion: " + constructCheckTasksForCompletionLog(vertex));
     //check for vertex failure first
     if (vertex.completedTaskCount > vertex.tasks.size()) {
       LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
-          + " for vertex " + vertex.logIdentifier
-          + ", numTasks=" + vertex.numTasks
-          + ", failedTaskCount=" + vertex.failedTaskCount
-          + ", killedTaskCount=" + vertex.killedTaskCount
-          + ", successfulTaskCount=" + vertex.succeededTaskCount
-          + ", completedTaskCount=" + vertex.completedTaskCount
-          + ", terminationCause=" + vertex.terminationCause);
+          + constructCheckTasksForCompletionLog(vertex));
     }
 
     if (vertex.completedTaskCount == vertex.tasks.size()) {
@@ -2141,7 +2151,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       
       //Only succeed if tasks complete successfully and no terminationCause is registered.
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
-        LOG.info("All tasks are succeeded, vertex:" + vertex.logIdentifier);
+        LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier);
         if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
           // start commit if there're commits or just finish if no commits
           return commitOrFinish(vertex);
@@ -2159,16 +2169,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   //triggered by commit_complete
   static VertexState checkCommitsForCompletion(final VertexImpl vertex) {
-    LOG.info("Checking commits for vertex completion for "
-        + vertex.logIdentifier
-        + ", numTasks=" + vertex.numTasks
-        + ", failedTaskCount=" + vertex.failedTaskCount
-        + ", killedTaskCount=" + vertex.killedTaskCount
-        + ", successfulTaskCount=" + vertex.succeededTaskCount
-        + ", completedTaskCount=" + vertex.completedTaskCount
-        + ", commitInProgress=" + vertex.commitFutures.size()
-        + ", terminationCause=" + vertex.terminationCause);
-
+    LOG.info("Commits completion: "
+            + constructCheckTasksForCompletionLog(vertex));
     // terminationCause is null mean commit is succeeded, otherwise terminationCause will be set.
     if (vertex.terminationCause == null) {
       Preconditions.checkState(vertex.getState() == VertexState.COMMITTING,
@@ -2289,20 +2291,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   private void initializeCommitters() throws Exception {
     if (!this.additionalOutputSpecs.isEmpty()) {
-      LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
+      LOG.info("Setting up committers for vertex " + logIdentifier + ", numAdditionalOutputs=" +
+          additionalOutputs.size());
       for (Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry:
           additionalOutputs.entrySet())  {
         final String outputName = entry.getKey();
         final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue();
         if (od.getControllerDescriptor() == null
             || od.getControllerDescriptor().getClassName() == null) {
-          LOG.info("Ignoring committer as none specified for output="
-              + outputName
-              + ", vertexId=" + logIdentifier);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Ignoring committer as none specified for output="
+                + outputName
+                + ", vertexId=" + logIdentifier);
+          }
           continue;
         }
         LOG.info("Instantiating committer for output=" + outputName
-            + ", vertexId=" + logIdentifier
+            + ", vertex=" + logIdentifier
             + ", committerClass=" + od.getControllerDescriptor().getClassName());
 
         dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
@@ -2319,12 +2324,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
                 .createClazzInstance(od.getControllerDescriptor().getClassName(),
                     new Class[]{OutputCommitterContext.class},
                     new Object[]{outputCommitterContext});
-            LOG.info("Invoking committer init for output=" + outputName
-                + ", vertexId=" + logIdentifier);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Invoking committer init for output=" + outputName
+                  + ", vertex=" + logIdentifier);
+            }
             outputCommitter.initialize();
             outputCommitters.put(outputName, outputCommitter);
-            LOG.info("Invoking committer setup for output=" + outputName
-                + ", vertexId=" + logIdentifier);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Invoking committer setup for output=" + outputName
+                  + ", vertex=" + logIdentifier);
+            }
             outputCommitter.setupOutput();
             return null;
           }
@@ -4034,8 +4043,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       }
       boolean forceTransitionToKillWait = false;
       vertex.completedTaskCount++;
-      LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : "
-          + vertex.completedTaskCount);
       VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
       Task task = vertex.tasks.get(taskEvent.getTaskID());
       if (taskEvent.getState() == TaskState.SUCCEEDED) {
@@ -4350,10 +4357,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           int numEventsSent = events.size() - numPreRoutedEvents;
           if (numEventsSent > 0) {
             StringBuilder builder = new StringBuilder();
-            builder.append("Sending ").append(attemptID).append(" numEvents: ").append(numEventsSent)
-            .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
-            .append(" out of ").append(currEventCount).append(" on-demand events in vertex: ")
-            .append(getLogIdentifier());
+            builder.append("Sending ").append(attemptID).append(" ")
+                .append(numEventsSent)
+                .append(" events [").append(fromEventId).append(",").append(nextFromEventId)
+                .append(") total ").append(currEventCount).append(" ")
+                .append(getLogIdentifier());
             LOG.info(builder.toString());
           }
         }
@@ -4644,9 +4652,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     for (String inputName : inputsWithInitializers) {
       inputList.add(rootInputDescriptors.get(inputName));
     }
-    LOG.info("Vertex will initialize via inputInitializers "
-        + logIdentifier + ". Starting root input initializers: "
-        + inputsWithInitializers.size());
+    LOG.info("Starting " + inputsWithInitializers.size() + " inputInitializers for vertex " +
+        logIdentifier);
     initWaitsForRootInitializers = true;
     rootInputInitializerManager.runInputInitializers(inputList);
     // Send pending rootInputInitializerEvents
@@ -4730,6 +4737,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   @Override
   public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
+    LOG.info("Setting " + inputs.size() + " additional inputs for vertex" + this.logIdentifier);
     this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size());
     for (RootInputLeafOutputProto input : inputs) {
       addIO(input.getName());
@@ -4774,7 +4782,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   @Override
   public void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs) {
-    LOG.info("setting additional outputs for vertex " + this.vertexName);
+    LOG.info("Setting " + outputs.size() + " additional outputs for vertex " + this.logIdentifier);
     this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
     this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size());
     for (RootInputLeafOutputProto output : outputs) {

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
index ab74382..d384aef 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -127,7 +127,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
 
     @SuppressWarnings("unchecked")
     public synchronized void launch(ContainerLaunchRequest event) {
-      LOG.info("Launching Container with Id: " + event.getContainerId());
+      LOG.info("Launching " + event.getContainerId());
       if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
         state = ContainerState.DONE;
         sendContainerLaunchFailedMsg(event.getContainerId(),
@@ -185,8 +185,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
       if(this.state == ContainerState.PREP) {
         this.state = ContainerState.KILLED_BEFORE_LAUNCH;
       } else {
-        LOG.info("Sending a stop request to the NM for ContainerId: "
-            + containerID);
+        LOG.info("Stopping " + containerID);
 
         ContainerManagementProtocolProxyData proxy = null;
         try {
@@ -353,6 +352,9 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
       // Load ContainerManager tokens before creating a connection.
       // TODO: Do it only once per NodeManager.
       ContainerId containerID = event.getBaseOperation().getContainerId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing ContainerOperation {}", event);
+      }
 
       Container c = getContainer(event);
       switch(event.getOpType()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index 04d7089..dbf8e38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -203,7 +203,9 @@ public class TaskSchedulerManager extends AbstractService implements
   }
 
   public synchronized void handleEvent(AMSchedulerEvent sEvent) {
-    LOG.info("Processing the event " + sEvent.toString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing the event " + sEvent.toString());
+    }
     switch (sEvent.getType()) {
     case S_TA_LAUNCH_REQUEST:
       handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
@@ -219,7 +221,7 @@ public class TaskSchedulerManager extends AbstractService implements
         handleTASucceeded(event);
         break;
       default:
-        throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
+        throw new TezUncheckedException("Unexpected TA_ENDED state: " + event.getState());
       }
       break;
     case S_CONTAINER_DEALLOCATE:
@@ -366,8 +368,8 @@ public class TaskSchedulerManager extends AbstractService implements
               event);
           return;
         }
-        LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity 
-            + " but no locality information exists for it. Ignoring hint.");
+        LOG.info("No attempt for task affinity to " + taskAffinity + " for attempt "
+            + taskAttempt.getID() + " Ignoring.");
         // fall through with null hosts/racks
       } else {
         hosts = (locationHint.getHosts() != null) ? locationHint
@@ -422,7 +424,8 @@ public class TaskSchedulerManager extends AbstractService implements
   @VisibleForTesting
   TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext,
                                         int schedulerId) {
-    LOG.info("Creating TaskScheduler: Local TaskScheduler");
+    LOG.info("Creating TaskScheduler: Local TaskScheduler with clusterIdentifier={}",
+        taskSchedulerContext.getCustomClusterIdentifier());
     return new LocalTaskSchedulerService(taskSchedulerContext);
   }
 
@@ -430,8 +433,8 @@ public class TaskSchedulerManager extends AbstractService implements
   TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
                                           NamedEntityDescriptor taskSchedulerDescriptor,
                                           int schedulerId) throws TezException {
-    LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(),
-        taskSchedulerDescriptor.getClassName());
+    LOG.info("Creating custom TaskScheduler {}:{} with clusterIdentifier={}", taskSchedulerDescriptor.getEntityName(),
+        taskSchedulerDescriptor.getClassName(), taskSchedulerContext.getCustomClusterIdentifier());
     return ReflectionUtils.createClazzInstance(taskSchedulerDescriptor.getClassName(),
         new Class[]{TaskSchedulerContext.class},
         new Object[]{taskSchedulerContext});
@@ -450,8 +453,6 @@ public class TaskSchedulerManager extends AbstractService implements
       } else {
         customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
       }
-      LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerDescriptors[i].getEntityName() + "]=" +
-          customAppIdIdentifier);
       taskSchedulers[i] = createTaskScheduler(host, port,
           trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i);
       taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]);

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index a1c4753..aaa6165 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -35,6 +35,7 @@ import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -329,7 +330,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
         "Heartbeats between preemptions should be >=1");
 
     delayedContainerManager = new DelayedContainerManager();
-    LOG.info("TaskScheduler initialized with configuration: " +
+    LOG.info("YarnTaskScheduler initialized with configuration: " +
             "maxRMHeartbeatInterval: " + heartbeatIntervalMax +
             ", containerReuseEnabled: " + shouldReuseContainers +
             ", reuseRackLocal: " + reuseRackLocal +
@@ -407,8 +408,11 @@ public class YarnTaskSchedulerService extends TaskScheduler
   @Override
   public void onContainersCompleted(List<ContainerStatus> statuses) {
     if (isStopStarted.get()) {
-      for (ContainerStatus status : statuses) {
-        LOG.info("Container " + status.getContainerId() + " is completed");
+      if (LOG.isDebugEnabled()) {
+        for (ContainerStatus status : statuses) {
+          LOG.debug("Container " + status.getContainerId() + " is completed with ContainerStatus=" +
+              status);
+        }
       }
       return;
     }
@@ -429,8 +433,10 @@ public class YarnTaskSchedulerService extends TaskScheduler
           // being released
           // completion of a container we had released earlier
           // an allocated container completed. notify app
-          LOG.info("Released container completed:" + completedId +
-                   " last allocated to task: " + task);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Released container completed:" + completedId +
+                " last allocated to task: " + task);
+          }
           appContainerStatus.put(task, containerStatus);
           continue;
         }
@@ -446,9 +452,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
         }
         if(task != null) {
           // completion of a container we have allocated currently
-          // an allocated container completed. notify app
-          LOG.info("Allocated container completed:" + completedId +
-                   " last allocated to task: " + task);
+          // an allocated container completed. notify app. This will cause attempt to get killed
+          LOG.info(
+              "Allocated container completed:" + completedId + " last allocated to task: " + task);
           appContainerStatus.put(task, containerStatus);
           continue;
         }
@@ -467,9 +473,13 @@ public class YarnTaskSchedulerService extends TaskScheduler
   @Override
   public void onContainersAllocated(List<Container> containers) {
     if (isStopStarted.get()) {
-      for (Container container : containers) {
-        LOG.info("Release container:" + container.getId() + ", because it is shutting down.");
-        releaseContainer(container.getId());
+      LOG.info("Ignoring container allocations because application is shutting down. Num " + 
+          containers.size());
+      if (LOG.isDebugEnabled()) {
+        for (Container container : containers) {
+          LOG.debug("Release container:" + container.getId() + ", because App is shutting down.");
+          releaseContainer(container.getId());
+        }
       }
       return;
     }
@@ -528,6 +538,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
     }
 
     // Release any unassigned containers given by the RM
+    if (containers.iterator().hasNext()) {
+      LOG.info("Releasing newly assigned containers which could not be allocated");
+    }
     releaseUnassignedContainers(containers);
 
     return assignedContainers;
@@ -581,15 +594,15 @@ public class YarnTaskSchedulerService extends TaskScheduler
     boolean isNew = heldContainer.isNew();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Trying to assign a delayed container"
-        + ", containerId=" + heldContainer.getContainer().getId()
-        + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
-        + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
-        + ", AMState=" + state
-        + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
-        + ", taskRequestsCount=" + taskRequests.size()
-        + ", heldContainers=" + heldContainers.size()
-        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
-        + ", isNew=" + isNew);
+          + ", containerId=" + heldContainer.getContainer().getId()
+          + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
+          + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
+          + ", AMState=" + state
+          + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
+          + ", taskRequestsCount=" + taskRequests.size()
+          + ", heldContainers=" + heldContainers.size()
+          + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+          + ", isNew=" + isNew);
     }
 
     if (state.equals(AMState.IDLE) || taskRequests.isEmpty()) {
@@ -637,7 +650,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
             + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
             + ", isNew=" + isNew);
           releaseUnassignedContainers(
-              Lists.newArrayList(heldContainer.getContainer()));        
+              Collections.singletonList((heldContainer.getContainer())));        
       } else {
         // no outstanding work and container idle timeout not expired
         if (LOG.isDebugEnabled()) {
@@ -690,7 +703,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
         assignReUsedContainerWithLocation(containerToAssign,
             NODE_LOCAL_ASSIGNER, assignedContainers, true);
         if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-          LOG.info("Failed to assign tasks to delayed container using node"
+          LOG.debug("Failed to assign tasks to delayed container using node"
             + ", containerId=" + heldContainer.getContainer().getId());
         }
       }
@@ -706,7 +719,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
           assignReUsedContainerWithLocation(containerToAssign,
               RACK_LOCAL_ASSIGNER, assignedContainers, false);
           if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-            LOG.info("Failed to assign tasks to delayed container using rack"
+            LOG.debug("Failed to assign tasks to delayed container using rack"
               + ", containerId=" + heldContainer.getContainer().getId());
           }
         }
@@ -722,7 +735,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
          assignReUsedContainerWithLocation(containerToAssign,
               NON_LOCAL_ASSIGNER, assignedContainers, false);
           if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-            LOG.info("Failed to assign tasks to delayed container using non-local"
+            LOG.debug("Failed to assign tasks to delayed container using non-local"
                 + ", containerId=" + heldContainer.getContainer().getId());
           }
         }
@@ -744,10 +757,10 @@ public class YarnTaskSchedulerService extends TaskScheduler
         if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
           && idleContainerTimeoutMin != -1) {
           LOG.info("Container's idle timeout expired. Releasing container"
-            + ", containerId=" + heldContainer.container.getId()
-            + ", containerExpiryTime="
-            + heldContainer.getContainerExpiryTime()
-            + ", idleTimeoutMin=" + idleContainerTimeoutMin);
+              + ", containerId=" + heldContainer.container.getId()
+              + ", containerExpiryTime="
+              + heldContainer.getContainerExpiryTime()
+              + ", idleTimeoutMin=" + idleContainerTimeoutMin);
           releaseUnassignedContainers(
             Lists.newArrayList(heldContainer.container));
         } else {
@@ -794,11 +807,11 @@ public class YarnTaskSchedulerService extends TaskScheduler
             if (safeToRelease && 
                 (!taskRequests.isEmpty() || !getContext().isSession())) {
               LOG.info("Releasing held container as either there are pending but "
-                + " unmatched requests or this is not a session"
-                + ", containerId=" + heldContainer.container.getId()
-                + ", pendingTasks=" + taskRequests.size()
-                + ", isSession=" + getContext().isSession()
-                + ". isNew=" + isNew);
+                  + " unmatched requests or this is not a session"
+                  + ", containerId=" + heldContainer.container.getId()
+                  + ", pendingTasks=" + taskRequests.size()
+                  + ", isSession=" + getContext().isSession()
+                  + ". isNew=" + isNew);
               releaseUnassignedContainers(
                 Lists.newArrayList(heldContainer.container));
             } else {
@@ -873,8 +886,8 @@ public class YarnTaskSchedulerService extends TaskScheduler
       // TODO this will not handle dynamic changes in resources
       totalResources = Resources.clone(getAvailableResources());
       LOG.info("App total resource memory: " + totalResources.getMemory() +
-               " cpu: " + totalResources.getVirtualCores() +
-               " taskAllocations: " + taskAllocations.size());
+          " cpu: " + totalResources.getVirtualCores() +
+          " taskAllocations: " + taskAllocations.size());
     }
 
     numHeartbeats++;
@@ -973,9 +986,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
     // See if any of the delayedContainers can be used for this task.
     delayedContainerManager.triggerScheduling(true);
     LOG.info("Allocation request for task: " + task +
-      " with request: " + request + 
-      " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") +
-      " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null"));
+          " with request: " + request +
+          " host: " + ((hosts != null && hosts.length > 0) ? hosts[0] : "null") +
+          " rack: " + ((racks != null && racks.length > 0) ? racks[0] : "null"));
   }
 
   /**
@@ -1008,8 +1021,10 @@ public class YarnTaskSchedulerService extends TaskScheduler
         LOG.info("Ignoring removal of unknown task: " + task);
         return false;
       } else {
-        LOG.info("Deallocated task: " + task + " from container: "
-            + container.getId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Deallocated task: " + task + " from container: "
+              + container.getId());
+        }
 
         if (!taskSucceeded || !shouldReuseContainers) {
           if (LOG.isDebugEnabled()) {
@@ -1029,6 +1044,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
             }
             assignedContainers = assignDelayedContainer(heldContainer);
           } else {
+            // this is a non standard situation
             LOG.info("Skipping container after task deallocate as container is"
                 + " no longer running, containerId=" + container.getId());
           }
@@ -1047,8 +1063,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
   public synchronized Object deallocateContainer(ContainerId containerId) {
     Object task = unAssignContainer(containerId, true);
     if(task != null) {
+      // non-standard case for the app layer to deallocate container
       LOG.info("Deallocated container: " + containerId +
-        " from task: " + task);
+          " from task: " + task);
       return task;
     }
 
@@ -1058,9 +1075,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
 
   @Override
   public synchronized void initiateStop() {
-    LOG.info("Initiate stop to YarnTaskScheduler");
+    LOG.info("Initiating stop of YarnTaskScheduler");
     // release held containers
-    LOG.info("Release held containers");
+    LOG.info("Releasing held containers");
     isStopStarted.set(true);
     // Create a new list for containerIds to iterate, otherwise it would cause ConcurrentModificationException
     // because method releaseContainer will change heldContainers.
@@ -1073,7 +1090,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
     }
 
     // remove taskRequest from AMRMClient to avoid allocating new containers in the next heartbeat
-    LOG.info("Remove all the taskRequests");
+    LOG.info("Removing all pending taskRequests");
     // Create a new list for tasks to avoid ConcurrentModificationException
     List<Object> tasks = new ArrayList<Object>(taskRequests.size());
     for (Object task : taskRequests.keySet()) {
@@ -1634,8 +1651,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
 
   private void releaseUnassignedContainers(Iterable<Container> containers) {
     for (Container container : containers) {
-      LOG.info("Releasing unused container: "
-          + container.getId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Releasing unused container: " + container.getId());
+      }
       releaseContainer(container.getId());
     }
   }
@@ -1704,19 +1722,17 @@ public class YarnTaskSchedulerService extends TaskScheduler
       Object task = getTask(assigned);
       assert task != null;
 
-      LOG.info("Assigning container to task"
-        + ", container=" + container
+      LOG.info("Assigning container to task: "
+        + "containerId=" + container.getId()
         + ", task=" + task
-        + ", containerHost=" + container.getNodeId().getHost()
+        + ", containerHost=" + container.getNodeId()
+        + ", containerPriority= " + container.getPriority()
+        + ", containerResources=" + container.getResource()
         + ", localityMatchType=" + locality
         + ", matchedLocation=" + matchedLocation
         + ", honorLocalityFlags=" + honorLocalityFlags
-        + ", reusedContainer="
-        + containerAssignments.containsKey(container.getId())
-        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
-        + ", containerResourceMemory=" + container.getResource().getMemory()
-        + ", containerResourceVCores="
-        + container.getResource().getVirtualCores());
+        + ", reusedContainer=" + containerAssignments.containsKey(container.getId())
+        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size());
 
       assignContainer(task, container, assigned);
     }
@@ -1904,6 +1920,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
                 heldContainers.get(delayedContainer.getContainer().getId())) {
               assignedContainers = assignDelayedContainer(delayedContainer);
             } else {
+              // non standard scenario
               LOG.info("Skipping delayed container as container is no longer"
                   + " running, containerId="
                   + delayedContainer.getContainer().getId());
@@ -1958,9 +1975,10 @@ public class YarnTaskSchedulerService extends TaskScheduler
           HeldContainer delayedContainer = iter.next();
           if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) {
             // this container is no longer held by us
+            // non standard scenario
             LOG.info("AssignAll - Skipping delayed container as container is no longer"
-                + " running, containerId="
-                + delayedContainer.getContainer().getId());
+                  + " running, containerId="
+                  + delayedContainer.getContainer().getId());
             iter.remove();
           }
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 470fa56..11b5006 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -113,8 +113,10 @@ public class AMContainerHelpers {
       // correctly, even though they may not be used by all tasks which will run
       // on this container.
 
-      LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
-          + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding #" + credentials.numberOfTokens() + " tokens and #"
+            + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container in common CLC");
+      }
       containerCredentials.addAll(credentials);
 
       DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
@@ -123,7 +125,9 @@ public class AMContainerHelpers {
           containerTokens_dob.getLength());
 
       // Add shuffle token
-      LOG.info("Putting shuffle token in serviceData");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Putting shuffle token in serviceData in common CLC");
+      }
       serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
           TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 69c21d4..d37d106 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -421,9 +421,11 @@ public class AMContainerImpl implements AMContainer {
         // TODO Can't set state to COMPLETED. Add a default error state.
       }
       if (oldState != getState()) {
-        LOG.info("AMContainer " + this.containerId + " transitioned from "
-            + oldState + " to " + getState()
-            + " via event " + event.getType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("AMContainer " + this.containerId + " transitioned from "
+              + oldState + " to " + getState()
+              + " via event " + event.getType());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -474,8 +476,10 @@ public class AMContainerImpl implements AMContainer {
       // task is not told to die since the TAL does not know about the container.
       container.registerWithTAListener();
       container.sendStartRequestToNM(clc);
-      LOG.info("Sending Launch Request for Container with id: " +
-          container.container.getId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending Launch Request for Container with id: " +
+            container.container.getId());
+      }
     }
   }
 
@@ -533,7 +537,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
       container.deAllocate();
-      LOG.info(
+      LOG.warn(
           "Unexpected event type: " + cEvent.getType() + " while in state: " +
               container.getState() + ". Event: " + cEvent);
 
@@ -597,8 +601,6 @@ public class AMContainerImpl implements AMContainer {
         }
       }
 
-      LOG.info("Assigned taskAttempt + [" + container.currentAttempt +
-          "] to container: [" + container.getContainerId() + "]");
       AMContainerTask amContainerTask = new AMContainerTask(
           event.getRemoteTaskSpec(), container.additionalLocalResources,
           container.credentialsChanged ? container.credentials : null, container.credentialsChanged,

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
index 3264708..b1c81af 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
@@ -95,6 +95,8 @@ public class PerSourceNodeTracker {
         AMNode amNode = nodeMap.get(nodeId);
         if (amNode == null) {
           LOG.info("Ignoring RM Health Update for unknown node: " + nodeId);
+          // This implies that the node exists on the cluster, but is not running a container for
+          // this application.
         } else {
           amNode.handle(rEvent);
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 9e275a2..e17a4d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -47,8 +47,6 @@ public class HistoryEventHandler extends CompositeService {
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    LOG.info("Initializing HistoryEventHandler");
-
     this.recoveryEnabled = context.getAMConf().getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
         TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
 
@@ -56,6 +54,10 @@ public class HistoryEventHandler extends CompositeService {
         TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
         TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
 
+    LOG.info("Initializing HistoryEventHandler with"
+        + "recoveryEnabled=" + recoveryEnabled
+        + ", historyServiceClassName=" + historyServiceClassName);
+
     historyLoggingService =
         ReflectionUtils.createClazzInstance(historyServiceClassName);
     historyLoggingService.setAppContext(context);
@@ -66,11 +68,11 @@ public class HistoryEventHandler extends CompositeService {
       addService(recoveryService);
     }
     super.serviceInit(conf);
+
   }
 
   @Override
   public void serviceStart() throws Exception {
-    LOG.info("Starting HistoryEventHandler");
     super.serviceStart();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 9f24151..7d83db2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -177,6 +177,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
 
   @Override
   public String toString() {
+    String counterStr = "";
+    if (state != TaskAttemptState.SUCCEEDED) {
+      counterStr = ", counters=" + ( tezCounters == null ? "null" :
+        tezCounters.toString()
+        .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+    }
     return "vertexName=" + vertexName
         + ", taskAttemptId=" + taskAttemptId
         + ", creationTime=" + creationTime
@@ -187,11 +193,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", status=" + state.name()
         + ", errorEnum=" + (error != null ? error.name() : "")
         + ", diagnostics=" + diagnostics
-        + ", lastDataEventSourceTA=" + 
-              ((dataEvents==null) ? 0:dataEvents.size())
-        + ", counters=" + (tezCounters == null ? "null" :
-          tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+        + counterStr;
   }
 
   public TezTaskAttemptID getTaskAttemptID() {

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index a58b49e..71d4419 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -110,9 +110,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
         + ", taskAttemptId=" + taskAttemptId
         + ", startTime=" + launchTime
         + ", containerId=" + containerId
-        + ", nodeId=" + nodeId
-        + ", inProgressLogs=" + inProgressLogsUrl
-        + ", completedLogs=" + completedLogsUrl;
+        + ", nodeId=" + nodeId;
   }
 
   public TezTaskAttemptID getTaskAttemptID() {

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
index 8852e02..4372d8e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -148,7 +148,9 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService {
     if (loggingDisabled) {
       return;
     }
-    LOG.info("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+    }
     try {
       try {
         JSONObject eventJson = HistoryEventJsonConversion.convertToJson(event.getHistoryEvent());

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index d870645..2fe0e6d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -106,7 +106,6 @@ public class RecoveryService extends AbstractService {
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    LOG.info("Initializing RecoveryService");
     recoveryPath = appContext.getCurrentRecoveryDir();
     recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf);
     bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
@@ -120,11 +119,16 @@ public class RecoveryService extends AbstractService {
     drainEventsFlag = conf.getBoolean(
         TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
         TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT);
+
+    LOG.info("RecoveryService initialized with "
+      + "recoveryPath=" + recoveryPath
+      + ", bufferSize(bytes)=" + bufferSize
+      + ", flushInterval(s)=" + flushInterval
+      + ", maxUnflushedEvents=" + maxUnflushedEvents);
   }
 
   @Override
   public void serviceStart() {
-    LOG.info("Starting RecoveryService");
     lastFlushTime = appContext.getClock().getTime();
     eventHandlingThread = new Thread(new Runnable() {
       @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/resources/tez-container-log4j.properties
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties
index 7a2aeab..c53994e 100644
--- a/tez-dag/src/main/resources/tez-container-log4j.properties
+++ b/tez-dag/src/main/resources/tez-container-log4j.properties
@@ -28,7 +28,7 @@ log4j.appender.CLA=org.apache.tez.common.TezContainerLogAppender
 log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
 
 log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}: %m%n
+log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t]|| %c{2} %m%n:
 
 #
 # Event Counter Appender

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 69237d4..1b66c8e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -117,8 +117,9 @@ public class MROutputCommitter extends OutputCommitter {
     if (jobConf.getBoolean("mapred.reducer.new-api", false)
         || jobConf.getBoolean("mapred.mapper.new-api", false))  {
       newApiCommitter = true;
-      LOG.info("Using mapred newApiCommitter.");
     }
+    LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() +
+        " using " + (newApiCommitter ? "new" : "old") + "mapred API");
 
     if (newApiCommitter) {
       TaskAttemptID taskAttemptID = new TaskAttemptID(

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index dbc7748..b93e4ba 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -69,35 +69,30 @@ public class MRInputAMSplitGenerator extends InputInitializer {
 
   @Override
   public List<Event> initialize() throws Exception {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     MRInputUserPayloadProto userPayloadProto = MRInputHelpers
         .parseMRInputPayload(getContext().getInputUserPayload());
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "
           + sw.elapsedMillis());
     }
-    if (LOG.isDebugEnabled()) {
-      sw.reset().start();
-    }
+    sw.reset().start();
     Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
         .getConfigurationBytes());
     
     sendSerializedEvents = conf.getBoolean(
         MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD,
         MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD_DEFAULT);
-    LOG.info("Emitting serialized splits: " + sendSerializedEvents);
+
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
+      LOG.debug("Emitting serialized splits: " + sendSerializedEvents + " for input " +
+          getContext().getInputName());
       LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis());
     }
 
-    if (LOG.isDebugEnabled()) {
-      sw.reset().start();
-    }
+    sw.reset().start();
 
     int totalResource = getContext().getTotalAvailableResource().getMemory();
     int taskResource = getContext().getVertexTaskResource().getMemory();
@@ -107,24 +102,26 @@ public class MRInputAMSplitGenerator extends InputInitializer {
 
     int numTasks = (int)((totalResource*waves)/taskResource);
 
+
+
+    boolean groupSplits = userPayloadProto.getGroupingEnabled();
     LOG.info("Input " + getContext().getInputName() + " asking for " + numTasks
         + " tasks. Headroom: " + totalResource + " Task Resource: "
-        + taskResource + " waves: " + waves);
+        + taskResource + " waves: " + waves + ", groupingEnabled: " + groupSplits);
 
     // Read all credentials into the credentials instance stored in JobConf.
     JobConf jobConf = new JobConf(conf);
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
 
     InputSplitInfoMem inputSplitInfo = null;
-    boolean groupSplits = userPayloadProto.getGroupingEnabled();
+
     if (groupSplits) {
-      LOG.info("Grouping input splits");
       inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, true, numTasks);
     } else {
       inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
     }
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("Time to create splits to mem: " + sw.elapsedMillis());
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index e6b70d2..28d108e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -69,14 +69,11 @@ public class MRInputSplitDistributor extends InputInitializer {
 
   @Override
   public List<Event> initialize() throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     MRInputUserPayloadProto userPayloadProto = MRInputHelpers
         .parseMRInputPayload(getContext().getInputUserPayload());
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "
           + sw.elapsedMillis());  
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 7f5e0e3..30e4a8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -284,21 +284,27 @@ public class MRInputHelpers {
     InputSplitInfoMem splitInfoMem = null;
     JobConf jobConf = new JobConf(conf);
     if (jobConf.getUseNewMapper()) {
-      LOG.info("Generating mapreduce api input splits");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generating mapreduce api input splits");
+      }
       Job job = Job.getInstance(conf);
       org.apache.hadoop.mapreduce.InputSplit[] splits =
           generateNewSplits(job, groupSplits, targetTasks);
       splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
           splits.length, job.getCredentials(), job.getConfiguration());
     } else {
-      LOG.info("Generating mapred api input splits");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generating mapred api input splits");
+      }
       org.apache.hadoop.mapred.InputSplit[] splits =
           generateOldSplits(jobConf, groupSplits, targetTasks);
       splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
           splits.length, jobConf.getCredentials(), jobConf);
     }
-    LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
-        + splitInfoMem.getSplitsProto().getSerializedSize());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
+          + splitInfoMem.getSplitsProto().getSerializedSize());
+    }
     return splitInfoMem;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
index 720af50..80828d4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
@@ -51,11 +51,13 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
     if (useNewApi) {
       oldPartitioner = null;
       if (partitions > 1) {
+        Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>> clazz =
+            (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
+                .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
+                    org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
+        LOG.info("Using newApi, MRpartitionerClass=" + clazz.getName());
         newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
-            .newInstance(
-                (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
-                    .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
-                        org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class), conf);
+            .newInstance(clazz, conf);
       } else {
         newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
           @Override
@@ -67,10 +69,12 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
     } else {
       newPartitioner = null;
       if (partitions > 1) {
-        oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+        Class<? extends org.apache.hadoop.mapred.Partitioner> clazz =
             (Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass(
-                "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class),
-            new JobConf(conf));
+                "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class);
+        LOG.info("Using oldApi, MRpartitionerClass=" + clazz.getName());
+        oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+            clazz, new JobConf(conf));
       } else {
         oldPartitioner = new org.apache.hadoop.mapred.Partitioner() {
           @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index d0e935f..6ea21e2 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -95,7 +95,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
           + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED + " set to false");
       return;
     }
-    LOG.info("Initializing ATSService");
 
     if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
       YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
@@ -124,7 +123,12 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     }
     sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
 
-    LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+    LOG.info("Initializing " + ATSHistoryLoggingService.class.getSimpleName() + " with "
+      + "maxEventsPerBatch=" + maxEventsPerBatch
+      + ", maxPollingTime(ms)=" + maxPollingTimeMillis
+      + ", waitTimeForShutdown(ms)=" + maxTimeToWaitOnShutdown
+      + ", TimelineACLManagerClass=" + atsHistoryACLManagerClassName);
+
     try {
       historyACLPolicyManager = ReflectionUtils.createClazzInstance(
           atsHistoryACLManagerClassName);
@@ -146,7 +150,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     if (!historyLoggingEnabled || timelineClient == null) {
       return;
     }
-    LOG.info("Starting ATSService");
     timelineClient.start();
 
     eventHandlingThread = new Thread(new Runnable() {