You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/09/17 00:20:19 UTC

[1/2] tez git commit: TEZ-2830. Backport TEZ-2774 to branch-0.7. (sseth)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 fd7c2fc13 -> e15faa56c


http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index fb8b530..92035e1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -202,11 +202,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
    * @throws Exception
    */
   public void initialize() throws Exception {
-    LOG.info("Initializing LogicalProcessorIORuntimeTask");
     Preconditions.checkState(this.state.get() == State.NEW, "Already initialized");
     this.state.set(State.INITED);
 
-    LOG.info("Creating processor" + ", processorClassName=" + processorDescriptor.getClassName());
     this.processorContext = createProcessorContext();
     this.processor = createProcessor(processorDescriptor.getClassName(), processorContext);
 
@@ -406,7 +404,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
     @Override
     protected Void callInternal() throws Exception {
-      LOG.info("Initializing Input using InputSpec: " + inputSpec);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initializing Input using InputSpec: " + inputSpec);
+      }
       String edgeName = inputSpec.getSourceVertexName();
       InputContext inputContext = createInputContext(inputsMap, inputSpec, inputIndex);
       LogicalInput input = createInput(inputSpec, inputContext);
@@ -414,13 +414,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       inputsMap.put(edgeName, input);
       inputContextMap.put(edgeName, inputContext);
 
-      LOG.info("Initializing Input with src edge: " + edgeName);
       List<Event> events = ((InputFrameworkInterface)input).initialize();
       sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
           inputContext.getTaskVertexName(), inputContext.getSourceVertexName(),
           taskSpec.getTaskAttemptID());
       initializedInputs.put(edgeName, input);
-      LOG.info("Initialized Input with src edge: " + edgeName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initialized Input with src edge: " + edgeName);
+      }
       return null;
     }
   }
@@ -436,7 +437,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
     @Override
     protected Void callInternal() throws Exception {
-      LOG.info("Starting Input with src edge: " + srcVertexName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting Input with src edge: " + srcVertexName);
+      }
+
       input.start();
       LOG.info("Started Input with src edge: " + srcVertexName);
       return null;
@@ -455,7 +459,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
     @Override
     protected Void callInternal() throws Exception {
-      LOG.info("Initializing Output using OutputSpec: " + outputSpec);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initializing Output using OutputSpec: " + outputSpec);
+      }
       String edgeName = outputSpec.getDestinationVertexName();
       OutputContext outputContext = createOutputContext(outputSpec, outputIndex);
       LogicalOutput output = createOutput(outputSpec, outputContext);
@@ -463,13 +469,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       outputsMap.put(edgeName, output);
       outputContextMap.put(edgeName, outputContext);
 
-      LOG.info("Initializing Output with dest edge: " + edgeName);
       List<Event> events = ((OutputFrameworkInterface)output).initialize();
       sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
           outputContext.getTaskVertexName(),
           outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
       initializedOutputs.put(edgeName, output);
-      LOG.info("Initialized Output with dest edge: " + edgeName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initialized Output with dest edge: " + edgeName);
+      }
       return null;
     }
   }
@@ -486,7 +493,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {
      groupInputsMap = new ConcurrentHashMap<String, MergedLogicalInput>(groupInputSpecs.size());
      for (GroupInputSpec groupInputSpec : groupInputSpecs) {
-        LOG.info("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
+       if (LOG.isDebugEnabled()) {
+         LOG.debug("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
+       }
        MergedInputContext mergedInputContext =
            new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(),
                groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs);
@@ -505,11 +514,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private void initializeLogicalIOProcessor() throws Exception {
-    LOG.info("Initializing processor" + ", processorClassName="
-        + processorDescriptor.getClassName());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initializing processor" + ", processorClassName="
+          + processorDescriptor.getClassName());
+    }
     processor.initialize();
-    LOG.info("Initialized processor" + ", processorClassName="
-        + processorDescriptor.getClassName());
+    LOG.info("Initialized processor");
   }
 
   private InputContext createInputContext(Map<String, LogicalInput> inputMap,
@@ -556,7 +566,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private LogicalInput createInput(InputSpec inputSpec, InputContext inputContext) throws TezException {
-    LOG.info("Creating Input");
     InputDescriptor inputDesc = inputSpec.getInputDescriptor();
     Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName(),
         new Class[]{InputContext.class, Integer.TYPE},
@@ -579,7 +588,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private LogicalOutput createOutput(OutputSpec outputSpec, OutputContext outputContext) throws TezException {
-    LOG.info("Creating Output");
     OutputDescriptor outputDesc = outputSpec.getOutputDescriptor();
     Output output = ReflectionUtils.createClazzInstance(outputDesc.getClassName(),
         new Class[]{OutputContext.class, Integer.TYPE},
@@ -704,7 +712,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             if (e == null) {
               continue;
             }
-            // TODO TODONEWTEZ
             if (!handleEvent(e)) {
               LOG.warn("Stopping Event Router thread as failed to handle"
                   + " event: " + e);
@@ -798,7 +805,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
             srcVertexName, e.getClass().getName(), e.getMessage());
       } finally {
-        LOG.info("Close input for vertex={}, sourceVertex={}", processor
+        LOG.info("Closed input for vertex={}, sourceVertex={}", processor
             .getContext().getTaskVertexName(), srcVertexName);
       }
     }
@@ -816,7 +823,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
             destVertexName, e.getClass().getName(), e.getMessage());
       } finally {
-        LOG.info("Close input for vertex={}, sourceVertex={}", processor
+        LOG.info("Closed input for vertex={}, sourceVertex={}", processor
             .getContext().getTaskVertexName(), destVertexName);
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 8d6466a..4431150 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -176,6 +176,8 @@ public class TezInputContextImpl extends TezTaskContextImpl
     super.close();
     this.userPayload = null;
     this.inputReadyTracker = null;
-    LOG.info("Cleared TezInputContextImpl related information");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cleared TezInputContextImpl related information");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 71e96db..1e5b6a5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -156,6 +156,8 @@ public class TezOutputContextImpl extends TezTaskContextImpl
   public void close() throws IOException {
     super.close();
     this.userPayload = null;
-    LOG.info("Cleared TezOutputContextImpl related information");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cleared TezOutputContextImpl related information");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index a191ae8..6dc30ff 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -121,7 +121,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
     super.close();
     this.userPayload = null;
     this.inputReadyTracker = null;
-    LOG.info("Cleared TezProcessorContextImpl related information");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cleared TezProcessorContextImpl related information");
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index 2622b1f..c822357 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -61,6 +61,7 @@ public class MemoryDistributor {
 
   private long totalJvmMemory;
   private final boolean isEnabled;
+  private final String allocatorClassName;
   private final Set<TaskContext> dupSet = Collections
       .newSetFromMap(new ConcurrentHashMap<TaskContext, Boolean>());
   private final List<RequestorInfo> requestList;
@@ -77,7 +78,13 @@ public class MemoryDistributor {
     this.conf = conf;
     isEnabled = conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED,
         TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED_DEFAULT);
-    
+
+    if (isEnabled) {
+      allocatorClassName = conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
+          TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS_DEFAULT);
+    } else {
+      allocatorClassName = null;
+    }
 
     this.numTotalInputs = numTotalInputs;
     this.numTotalOutputs = numTotalOutputs;
@@ -85,7 +92,8 @@ public class MemoryDistributor {
     this.requestList = Collections.synchronizedList(new LinkedList<RequestorInfo>());
     LOG.info("InitialMemoryDistributor (isEnabled=" + isEnabled + ") invoked with: numInputs="
         + numTotalInputs + ", numOutputs=" + numTotalOutputs
-        + ", JVM.maxFree=" + totalJvmMemory);
+        + ", JVM.maxFree=" + totalJvmMemory
+        + ", allocatorClassName=" + allocatorClassName);
   }
 
 
@@ -97,7 +105,7 @@ public class MemoryDistributor {
       TaskContext taskContext, EntityDescriptor<?> descriptor) {
     registerRequest(requestSize, callback, taskContext, descriptor);
   }
-  
+
   /**
    * Used by the Tez framework to distribute initial memory after components
    * have made their initial requests.
@@ -106,6 +114,9 @@ public class MemoryDistributor {
   public void makeInitialAllocations() throws TezException {
     Preconditions.checkState(numInputsSeen.get() == numTotalInputs, "All inputs are expected to ask for memory");
     Preconditions.checkState(numOutputsSeen.get() == numTotalOutputs, "All outputs are expected to ask for memory");
+
+    logInitialRequests(requestList);
+
     Iterable<InitialMemoryRequestContext> requestContexts = Iterables.transform(requestList,
         new Function<RequestorInfo, InitialMemoryRequestContext>() {
           public InitialMemoryRequestContext apply(RequestorInfo requestInfo) {
@@ -121,14 +132,12 @@ public class MemoryDistributor {
         }
       });
     } else {
-      String allocatorClassName = conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
-          TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS_DEFAULT);
-      LOG.info("Using Allocator class: " + allocatorClassName);
       InitialMemoryAllocator allocator = ReflectionUtils.createClazzInstance(allocatorClassName);
       allocator.setConf(conf);
       allocations = allocator.assignMemory(totalJvmMemory, numTotalInputs, numTotalOutputs,
           Iterables.unmodifiableIterable(requestContexts));
       validateAllocations(allocations, requestList.size());
+      logFinalAllocations(allocations, requestList);
     }
 
     // Making the callbacks directly for now, instead of spawning threads. The
@@ -137,14 +146,18 @@ public class MemoryDistributor {
     Iterator<Long> allocatedIter = allocations.iterator();
     for (RequestorInfo rInfo : requestList) {
       long allocated = allocatedIter.next();
-      LOG.info("Informing: " + rInfo.getRequestContext().getComponentType() + ", "
-          + rInfo.getRequestContext().getComponentVertexName() + ", "
-          + rInfo.getRequestContext().getComponentClassName() + ": requested="
-          + rInfo.getRequestContext().getRequestedSize() + ", allocated=" + allocated);
+      if (LOG.isDebugEnabled()) {
+        LOG.info("Informing: " + rInfo.getRequestContext().getComponentType() + ", "
+            + rInfo.getRequestContext().getComponentVertexName() + ", "
+            + rInfo.getRequestContext().getComponentClassName() + ": requested="
+            + rInfo.getRequestContext().getRequestedSize() + ", allocated=" + allocated);
+      }
       rInfo.getCallback().memoryAssigned(allocated);
     }
   }
 
+
+
   /**
    * Allow tests to set memory.
    * @param size
@@ -233,8 +246,6 @@ public class MemoryDistributor {
       this.requestContext = new InitialMemoryRequestContext(requestSize, descriptor.getClassName(),
           type, componentVertexName);
       this.callback = callback;
-      LOG.info("Received request: " + requestSize + ", type: " + type + ", componentVertexName: "
-          + componentVertexName);
     }
 
     public MemoryUpdateCallback getCallback() {
@@ -246,4 +257,45 @@ public class MemoryDistributor {
     }
   }
 
+
+  private void logInitialRequests(List<RequestorInfo> initialRequests) {
+    if (initialRequests != null && !initialRequests.isEmpty()) {
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < initialRequests.size(); i++) {
+        InitialMemoryRequestContext context = initialRequests.get(i).getRequestContext();
+        sb.append("[");
+        sb.append(context.getComponentVertexName()).append(":");
+        sb.append(context.getComponentType()).append(":");
+        sb.append(context.getRequestedSize()).append(":").append(context.getComponentClassName());
+        sb.append("]");
+        if (i < initialRequests.size() - 1) {
+          sb.append(", ");
+        }
+      }
+      LOG.info("InitialRequests=" + sb.toString());
+    }
+  }
+
+  private void logFinalAllocations(Iterable<Long> allocations, List<RequestorInfo> requestList) {
+    if (requestList != null && !requestList.isEmpty()) {
+      Iterator<Long> allocatedIter = allocations.iterator();
+      StringBuilder sb = new StringBuilder();
+
+      for (int i = 0 ; i < requestList.size() ; i++) {
+        long allocated = allocatedIter.next();
+        InitialMemoryRequestContext context = requestList.get(i).getRequestContext();
+        sb.append("[");
+        sb.append(context.getComponentVertexName()).append(":");
+        sb.append(context.getComponentClassName()).append(":");
+        sb.append(context.getComponentType()).append(":");
+        sb.append(context.getRequestedSize()).append(":").append(allocated);
+        sb.append("]");
+        if (i < requestList.size() - 1) {
+          sb.append(", ");
+        }
+      }
+      LOG.info("Allocations=" + sb.toString());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
index ebb94c6..2472c51 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
@@ -44,7 +44,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 public class TaskCounterUpdater {
 
   private static final Logger LOG = LoggerFactory.getLogger(TaskCounterUpdater.class);
-  
+
   private final TezCounters tezCounters;
   private final Configuration conf;
 
@@ -149,6 +149,6 @@ public class TaskCounterUpdater {
 
     pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pid, clazz, conf);
 
-    LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
+    LOG.info("Using ResourceCalculatorProcessTree : " + clazz.getName());
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
index 8ee30c5..0ece227 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
@@ -40,7 +40,7 @@ public class ContainerReporter extends CallableWithNdc<ContainerTask> {
   private final TezTaskUmbilicalProtocol umbilical;
   private final ContainerContext containerContext;
   private final int getTaskMaxSleepTime;
-  private final long LOG_INTERVAL = 2000l;
+  private final long LOG_INTERVAL = 30000l;
 
   private long nextGetTaskPrintTime;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 062b497..7f03992 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -287,10 +287,10 @@ public class TezChild {
     Preconditions.checkState(!containerTask.shouldDie());
     Preconditions.checkState(containerTask.getTaskSpec() != null);
     if (containerTask.haveCredentialsChanged()) {
-      LOG.info("Refreshing UGI since Credentials have changed");
       Credentials taskCreds = containerTask.getCredentials();
       if (taskCreds != null) {
-        LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
+        LOG.info("Refreshing UGI since Credentials have changed. Credentials : #Tokens=" +
+            taskCreds.numberOfTokens() + ", #SecretKeys="
             + taskCreds.numberOfSecretKeys());
         childUGI = UserGroupInformation.createRemoteUser(user);
         childUGI.addCredentials(containerTask.getCredentials());
@@ -315,20 +315,20 @@ public class TezChild {
       LOG.debug("Additional Resources added to container: " + additionalResources);
     }
 
-    LOG.info("Localizing additional local resources for Task : " + additionalResources);
-    List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
-        Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
-          @Override
-          public URI apply(TezLocalResource input) {
-            return input.getUri();
-          }
-        }), defaultConf, workingDir);
-    RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
 
-    LOG.info("Done localizing additional resources");
-    final TaskSpec taskSpec = containerTask.getTaskSpec();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("New container task context:" + taskSpec.toString());
+    if (additionalResources != null && !additionalResources.isEmpty()) {
+      LOG.info("Localizing additional local resources for Task : " + additionalResources);
+
+      List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
+          Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
+            @Override
+            public URI apply(TezLocalResource input) {
+              return input.getUri();
+            }
+          }), defaultConf, workingDir);
+      RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
+
+      LOG.info("Done localizing additional resources");
     }
   }
 
@@ -456,7 +456,8 @@ public class TezChild {
     final Configuration defaultConf = new Configuration();
 
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
-    LOG.info("TezChild starting");
+    final String pid = System.getenv().get("JVM_PID");
+
 
     assert args.length == 5;
     String host = args[0];
@@ -466,8 +467,7 @@ public class TezChild {
     final int attemptNumber = Integer.parseInt(args[4]);
     final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
         .name()));
-    final String pid = System.getenv().get("JVM_PID");
-    LOG.info("PID, containerIdentifier:  " + pid + ", " + containerIdentifier);
+    LOG.info("TezChild starting with PID=" + pid + ", containerIdentifier=" + containerIdentifier);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port
           + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index ad0eaf9..69436ba 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -62,7 +62,6 @@ public class TezRuntimeUtils {
     Class<? extends Combiner> clazz;
     String className = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
     if (className == null) {
-      LOG.info("No combiner specified via " + TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS + ". Combiner will not be used");
       return null;
     }
     LOG.info("Using Combiner class: " + className);

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
index b70c9d7..8477300 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
@@ -168,7 +168,6 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
     Request request = new Request(context.getComponentClassName(), context.getRequestedSize(),
         requestType, typeScaleFactor);
     requests.add(request);
-    LOG.info("ScaleFactor: " + typeScaleFactor + ", for type: " + requestType);
     numRequestsScaled += typeScaleFactor;
   }
 
@@ -194,7 +193,9 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
       requestType = RequestType.PARTITIONED_UNSORTED_OUTPUT;
     } else {
       requestType = RequestType.OTHER;
-      LOG.info("Falling back to RequestType.OTHER for class: " + className);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Falling back to RequestType.OTHER for class: " + className);
+      }
     }
     return requestType;
   }
@@ -219,6 +220,7 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
       }
     }
 
+    StringBuilder sb = new StringBuilder();
     Set<RequestType> seenTypes = new HashSet<RequestType>();
 
     for (String ratio : ratios) {
@@ -232,7 +234,9 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
       }
       Preconditions.checkState(ratioVal >= 0, "Ratio must be >= 0");
       typeScaleMap.put(requestType, ratioVal);
+      sb.append("[").append(requestType).append(":").append(ratioVal).append("]");
     }
+    LOG.info("ScaleRatiosUsed=" + sb.toString());
   }
 
   private double computeReservedFraction(int numTotalRequests) {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
index 6b8bd0d..bf3e9db 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
@@ -151,9 +151,7 @@ public class RPCLoadGen extends TezExampleBase {
       random.nextBytes(diskPayload);
       fs = FileSystem.get(conf);
       resourcePath = new Path(Path.SEPARATOR + "tmp", DISK_PAYLOAD_NAME);
-      System.err.println("ZZZ: HDFSPath: " + resourcePath);
       resourcePath = fs.makeQualified(resourcePath);
-      System.err.println("ZZZ: HDFSPathResolved: " + resourcePath);
       FSDataOutputStream dataOut = fs.create(resourcePath, true);
       dataOut.write(diskPayload);
       dataOut.close();


[2/2] tez git commit: TEZ-2830. Backport TEZ-2774 to branch-0.7. (sseth)

Posted by ss...@apache.org.
TEZ-2830. Backport TEZ-2774 to branch-0.7. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e15faa56
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e15faa56
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e15faa56

Branch: refs/heads/branch-0.7
Commit: e15faa56c5a1a9ab678fc15583189be8882228f1
Parents: fd7c2fc
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 16 15:20:01 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 16 15:20:01 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/AsyncDispatcher.java  |  14 +-
 .../tez/common/AsyncDispatcherConcurrent.java   |   9 +-
 .../org/apache/tez/common/TezUtilsInternal.java |  14 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  27 +++-
 .../app/dag/RootInputInitializerManager.java    |   2 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   2 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  14 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  34 ++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  99 ++++++------
 .../dag/app/launcher/ContainerLauncherImpl.java |   9 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  10 +-
 .../dag/app/rm/YarnTaskSchedulerService.java    | 155 +++++++++++--------
 .../app/rm/container/AMContainerHelpers.java    |  10 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  18 ++-
 .../tez/dag/app/rm/node/AMNodeTracker.java      |   2 +
 .../tez/dag/history/HistoryEventHandler.java    |   8 +-
 .../events/TaskAttemptFinishedEvent.java        |  12 +-
 .../history/events/TaskAttemptStartedEvent.java |   4 +-
 .../impl/SimpleHistoryLoggingService.java       |   4 +-
 .../dag/history/recovery/RecoveryService.java   |   8 +-
 .../resources/tez-container-log4j.properties    |   2 +-
 .../mapreduce/committer/MROutputCommitter.java  |   3 +-
 .../common/MRInputAMSplitGenerator.java         |  31 ++--
 .../common/MRInputSplitDistributor.java         |   7 +-
 .../tez/mapreduce/hadoop/MRInputHelpers.java    |  14 +-
 .../tez/mapreduce/partition/MRPartitioner.java  |  18 ++-
 .../logging/ats/ATSHistoryLoggingService.java   |   9 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  45 +++---
 .../runtime/api/impl/TezInputContextImpl.java   |   4 +-
 .../runtime/api/impl/TezOutputContextImpl.java  |   4 +-
 .../api/impl/TezProcessorContextImpl.java       |   4 +-
 .../common/resources/MemoryDistributor.java     |  76 +++++++--
 .../tez/runtime/metrics/TaskCounterUpdater.java |   4 +-
 .../tez/runtime/task/ContainerReporter.java     |   2 +-
 .../org/apache/tez/runtime/task/TezChild.java   |  36 ++---
 .../runtime/library/common/TezRuntimeUtils.java |   1 -
 .../WeightedScalingMemoryDistributor.java       |   8 +-
 .../tez/mapreduce/examples/RPCLoadGen.java      |   2 -
 39 files changed, 437 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c8bde4..3e56d6c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2830. Backport TEZ-2774 to branch-0.7. Improvements to logging in the AM and part of the runtime.
   TEZ-2829. Tez UI: minor fixes to in-progress update of UI from AM
   TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.
   TEZ-2825. Report progress in terms of completed tasks to reduce load on AM for Tez UI

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
index 4319f4f..159ccd9 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java
@@ -130,7 +130,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
   @Override
   protected void serviceStart() throws Exception {
     eventHandlingThread = new Thread(createThread());
-    eventHandlingThread.setName("Dispatcher thread: " + name);
+    eventHandlingThread.setName("Dispatcher thread {" + name + "}");
     eventHandlingThread.start();
     
     //start all the components
@@ -211,7 +211,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
 
   private void checkForExistingConcurrentDispatcher(Class<? extends Enum> eventType) {
     AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(eventType);
-    Preconditions.checkState(concurrentDispatcher == null, 
+    Preconditions.checkState(concurrentDispatcher == null,
         "Multiple concurrent dispatchers cannot be registered for: " + eventType.getName());
   }
   
@@ -259,7 +259,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+    LOG.info(
+          "Registering " + eventType + " for independent dispatch using: " + handler.getClass());
     AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName);
     dispatcher.register(eventType, handler);
     eventDispatchers.put(eventType, dispatcher);
@@ -272,7 +273,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
+    LOG.info(
+          "Registering " + eventType + " for concurrent dispatch using: " + handler.getClass());
     AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
     dispatcher.register(eventType, handler);
     concurrentEventDispatchers.put(eventType, dispatcher);
@@ -286,8 +288,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher {
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
-        + handler.getClass());
+    LOG.info("Registering " + eventType + " with existing concurrent dispatch using: "
+          + handler.getClass());
     dispatcher.register(eventType, handler);
     concurrentEventDispatchers.put(eventType, dispatcher);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
index d19bf9e..321ea8b 100644
--- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
+++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java
@@ -136,7 +136,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
   @Override
   protected void serviceStart() throws Exception {
     execService = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat("Dispatcher [" + this.name + "] #%d").build());
+        .setNameFormat("Dispatcher {" + this.name + "} #%d").build());
     for (int i=0; i<numThreads; ++i) {
       eventQueues.add(new LinkedBlockingQueue<Event>());
     }
@@ -215,7 +215,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
 
   private void checkForExistingDispatcher(Class<? extends Enum> eventType) {
     AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(eventType);
-    Preconditions.checkState(registeredDispatcher == null, 
+    Preconditions.checkState(registeredDispatcher == null,
         "Multiple dispatchers cannot be registered for: " + eventType.getName());
   }
 
@@ -263,7 +263,8 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
     
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
-    LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass());
+    LOG.info(
+          "Registering " + eventType + " for independent dispatch using: " + handler.getClass());
     AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads);
     dispatcher.register(eventType, handler);
     eventDispatchers.put(eventType, dispatcher);
@@ -278,7 +279,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa
     /* check to see if we have a listener registered */
     checkForExistingDispatchers(true, eventType);
     LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: "
-        + handler.getClass());
+          + handler.getClass());
     dispatcher.register(eventType, handler);
     eventDispatchers.put(eventType, dispatcher);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 9c78377..55e1e6e 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -81,13 +81,10 @@ public class TezUtilsInternal {
 
 
   public static byte[] compressBytes(byte[] inBytes) throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     byte[] compressed = compressBytesInflateDeflate(inBytes);
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
           + ", CompressTime: " + sw.elapsedMillis());
     }
@@ -95,13 +92,10 @@ public class TezUtilsInternal {
   }
 
   public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
           + ", UncompressTimeTaken: " + sw.elapsedMillis());
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 49ba802..f2194fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -435,7 +435,6 @@ public class DAGAppMaster extends AbstractService {
 
     // Prepare the TaskAttemptListener server for authentication of Containers
     // TaskAttemptListener gets the information via jobTokenSecretManager.
-    LOG.info("Adding session token to jobTokenSecretManager for application");
     jobTokenSecretManager.addTokenForJob(
         appAttemptID.getApplicationId().toString(), sessionToken);
 
@@ -461,8 +460,11 @@ public class DAGAppMaster extends AbstractService {
     dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler());
     dispatcher.register(DAGEventType.class, dagEventDispatcher);
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
-    if (!conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
-        TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT)) {
+    boolean useConcurrentDispatcher =
+        conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER,
+            TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT);
+    LOG.info("Using concurrent dispatcher: " + useConcurrentDispatcher);
+    if (!useConcurrentDispatcher) {
       dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
       dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
     } else {
@@ -522,7 +524,7 @@ public class DAGAppMaster extends AbstractService {
     currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
         appAttemptID.getAttemptId());
     if (LOG.isDebugEnabled()) {
-      LOG.info("Stage directory information for AppAttemptId :" + this.appAttemptID
+      LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID
           + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
           + " recoveryAttemptDir :" + currentRecoveryDataDir);
     }
@@ -888,7 +890,7 @@ public class DAGAppMaster extends AbstractService {
 
     try {
       if (LOG.isDebugEnabled()) {
-        LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString()
+        LOG.debug("JSON dump for submitted DAG, dagId=" + dagId.toString()
             + ", json="
             + DAGUtils.generateSimpleJSONPlan(dagPB).toString());
       }
@@ -2027,6 +2029,7 @@ public class DAGAppMaster extends AbstractService {
   public static void main(String[] args) {
     try {
       Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+      final String pid = System.getenv().get("JVM_PID");
       String containerIdStr =
           System.getenv(Environment.CONTAINER_ID.name());
       String nodeHostString = System.getenv(Environment.NM_HOST.name());
@@ -2066,6 +2069,18 @@ public class DAGAppMaster extends AbstractService {
           false, "Run Tez Application Master in Session mode");
 
       CommandLine cliParser = new GnuParser().parse(opts, args);
+      boolean sessionModeCliOption = cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
+
+      LOG.info("Creating DAGAppMaster for "
+          + "applicationId=" + applicationAttemptId.getApplicationId()
+          + ", attemptNum=" + applicationAttemptId.getAttemptId()
+          + ", AMContainerId=" + containerId
+          + ", jvmPid=" + pid
+          + ", userFromEnv=" + jobUserName
+          + ", cliSessionOption=" + sessionModeCliOption
+          + ", pwd=" + System.getenv(Environment.PWD.name())
+          + ", localDirs=" + System.getenv(Environment.LOCAL_DIRS.name())
+          + ", logDirs=" + System.getenv(Environment.LOG_DIRS.name()));
 
       // TODO Does this really need to be a YarnConfiguration ?
       Configuration conf = new Configuration(new YarnConfiguration());
@@ -2077,7 +2092,7 @@ public class DAGAppMaster extends AbstractService {
           new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
               Integer.parseInt(nodePortString),
               Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime,
-              cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION),
+              sessionModeCliOption,
               System.getenv(Environment.PWD.name()),
               TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())),
               TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())),

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 4a8a286..13128f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -100,7 +100,7 @@ public class RootInputInitializerManager {
     this.vertex = vertex;
     this.eventHandler = appContext.getEventHandler();
     this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("InputInitializer [" + this.vertex.getName() + "] #%d").build());
+        .setDaemon(true).setNameFormat("InputInitializer {" + this.vertex.getName() + "} #%d").build());
     this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     this.dagUgi = dagUgi;
     this.entityStateTracker = stateTracker;

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index f03dcd1..88dcc8d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -181,4 +181,6 @@ public interface Vertex extends Comparable<Vertex> {
   public int getKilledTaskAttemptCount();
 
   public Configuration getConf();
+
+  public boolean isSpeculationEnabled();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index bdc0207..f63f461 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -785,10 +785,12 @@ public class TaskAttemptImpl implements TaskAttempt,
             );
       }
       if (oldState != getInternalState()) {
-          LOG.info(attemptId + " TaskAttempt Transitioned from "
-           + oldState + " to "
-           + getInternalState() + " due to event "
-           + event.getType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(attemptId + " TaskAttempt Transitioned from "
+              + oldState + " to "
+              + getInternalState() + " due to event "
+              + event.getType());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -1118,7 +1120,9 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskSpec remoteTaskSpec;
       try {
         remoteTaskSpec = ta.createRemoteTaskSpec();
-        LOG.info("remoteTaskSpec:" + remoteTaskSpec);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("remoteTaskSpec:" + remoteTaskSpec);
+        }
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
         LOG.error(msg, e);

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index e6027f5..46215d0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -501,7 +501,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         int toEventId = actualMax + fromEventId;
         events = new ArrayList<TezEvent>(tezEventsForTaskAttempts.subList(fromEventId, toEventId));
         LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
-            + "-" + toEventId + ")");
+            + "-" + toEventId + ").");
         // currently not modifying the events so that we dont have to create
         // copies of events. e.g. if we have to set taskAttemptId into the TezEvent
         // destination metadata then we will need to create a copy of the TezEvent
@@ -760,12 +760,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   public boolean canCommit(TezTaskAttemptID taskAttemptID) {
     writeLock.lock();
     try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Commit go/no-go request from " + taskAttemptID);
+      }
       TaskState state = getState();
       if (state == TaskState.SCHEDULED) {
         // the actual running task ran and is done and asking for commit. we are still stuck 
         // in the scheduled state which indicates a backlog in event processing. lets wait for the
         // backlog to clear. returning false will make the attempt come back to us.
-        LOG.debug("Event processing delay. "
+        LOG.info(
+            "Event processing delay. "
             + "Attempt committing before state machine transitioned to running : Task {}", taskId);
         return false;
       }
@@ -796,7 +800,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         }
       } else {
         if (commitAttempt.equals(taskAttemptID)) {
-          LOG.info(taskAttemptID + " given a go for committing the task output.");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(taskAttemptID + " already given a go for committing the task output.");
+          }
           return true;
         }
         // Don't think this can be a pluggable decision, so simply raise an
@@ -804,9 +810,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         // Wait for commit attempt to succeed. Dont kill this. If commit
         // attempt fails then choose a different committer. When commit attempt
         // succeeds then this and others will be killed
-        LOG.info(commitAttempt
-            + " is current committer. Commit waiting for:  "
-            + taskAttemptID);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(commitAttempt + " is current committer. Commit waiting for:  " + taskAttemptID);
+        }
         return false;
       }
 
@@ -814,7 +820,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       writeLock.unlock();
     }
   }
-  
+
   TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
@@ -899,9 +905,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         internalError(event.getType());
       }
       if (oldState != getInternalState()) {
-        LOG.info(taskId + " Task Transitioned from " + oldState + " to "
-            + getInternalState() + " due to event "
-            + event.getType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
+              + getInternalState() + " due to event "
+              + event.getType());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -1112,7 +1120,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             //  other reasons.
             !attempt.isFinished()) {
           LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " +
-            task.successfulAttempt + " has succeeded");
+              task.successfulAttempt + " has succeeded");
           String diagnostics = null;
           TaskAttemptTerminationCause errCause = null;
           if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
@@ -1469,7 +1477,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       } else {
         // nothing to do
         LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " +
-                 task.successfulAttempt + " is already successful");
+            task.successfulAttempt + " is already successful");
         return TaskStateInternal.SUCCEEDED;
       }
     }
@@ -1512,7 +1520,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) {
     if (commitAttempt != null && commitAttempt.equals(attempt.getID())) {
-      LOG.info("Removing commit attempt: " + commitAttempt);
+      LOG.info("Unsetting commit attempt: " + commitAttempt + " since attempt is being killed");
       commitAttempt = null;
     }
     if (attempt != null && !attempt.isFinished()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index daeae3f..4a8309e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -877,6 +877,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     this.clock = clock;
     this.appContext = appContext;
     this.commitVertexOutputs = commitVertexOutputs;
+    this.logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
 
     this.taskAttemptListener = taskAttemptListener;
     this.taskHeartbeatHandler = thh;
@@ -935,6 +936,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
     this.containerContext = new ContainerContext(this.localResources,
         appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this);
+    LOG.info("Default container context for " + logIdentifier + "=" + containerContext + ", Default Resources=" + this.taskResource);
 
     if (vertexPlan.getInputsCount() > 0) {
       setAdditionalInputs(vertexPlan.getInputsList());
@@ -957,7 +959,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       speculator = new LegacySpeculator(vertexConf, getAppContext(), this);
     }
     
-    logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
+
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
 
@@ -971,7 +973,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return vertexConf;
   }
 
-  private boolean isSpeculationEnabled() {
+  @Override
+  public boolean isSpeculationEnabled() {
     return isSpeculationEnabled;
   }
 
@@ -2005,29 +2008,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
 
+  private static String constructCheckTasksForCompletionLog(VertexImpl vertex) {
+    String logLine = vertex.logIdentifier
+        + ", tasks=" + vertex.numTasks
+        + ", failed=" + vertex.failedTaskCount
+        + ", killed=" + vertex.killedTaskCount
+        + ", success=" + vertex.succeededTaskCount
+        + ", completed=" + vertex.completedTaskCount
+        + ", commits=" + vertex.commitFutures.size()
+        + ", err=" + vertex.terminationCause;
+    return logLine;
+  }
+
   // triggered by task_complete
   static VertexState checkTasksForCompletion(final VertexImpl vertex) {
-
-    LOG.info("Checking tasks for vertex completion for "
-        + vertex.logIdentifier
-        + ", numTasks=" + vertex.numTasks
-        + ", failedTaskCount=" + vertex.failedTaskCount
-        + ", killedTaskCount=" + vertex.killedTaskCount
-        + ", successfulTaskCount=" + vertex.succeededTaskCount
-        + ", completedTaskCount=" + vertex.completedTaskCount
-        + ", commitInProgress=" + vertex.commitFutures.size()
-        + ", terminationCause=" + vertex.terminationCause);
-
+    // this log helps quickly count the completion count for a vertex.
+    // grepping and counting for attempts and handling re-tries is time consuming
+    LOG.info("Task Completion: " + constructCheckTasksForCompletionLog(vertex));
     //check for vertex failure first
     if (vertex.completedTaskCount > vertex.tasks.size()) {
       LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
-          + " for vertex " + vertex.logIdentifier
-          + ", numTasks=" + vertex.numTasks
-          + ", failedTaskCount=" + vertex.failedTaskCount
-          + ", killedTaskCount=" + vertex.killedTaskCount
-          + ", successfulTaskCount=" + vertex.succeededTaskCount
-          + ", completedTaskCount=" + vertex.completedTaskCount
-          + ", terminationCause=" + vertex.terminationCause);
+          + constructCheckTasksForCompletionLog(vertex));
     }
 
     if (vertex.completedTaskCount == vertex.tasks.size()) {
@@ -2036,7 +2037,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       
       //Only succeed if tasks complete successfully and no terminationCause is registered.
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
-        LOG.info("All tasks are succeeded, vertex:" + vertex.logIdentifier);
+        LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier);
         if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
           // start commit if there're commits or just finish if no commits
           return commitOrFinish(vertex);
@@ -2054,16 +2055,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   //triggered by commit_complete
   static VertexState checkCommitsForCompletion(final VertexImpl vertex) {
-    LOG.info("Checking commits for vertex completion for "
-        + vertex.logIdentifier
-        + ", numTasks=" + vertex.numTasks
-        + ", failedTaskCount=" + vertex.failedTaskCount
-        + ", killedTaskCount=" + vertex.killedTaskCount
-        + ", successfulTaskCount=" + vertex.succeededTaskCount
-        + ", completedTaskCount=" + vertex.completedTaskCount
-        + ", commitInProgress=" + vertex.commitFutures.size()
-        + ", terminationCause=" + vertex.terminationCause);
-
+    LOG.info("Commits completion: "
+            + constructCheckTasksForCompletionLog(vertex));
     // terminationCause is null mean commit is succeeded, otherwise terminationCause will be set.
     if (vertex.terminationCause == null) {
       Preconditions.checkState(vertex.getState() == VertexState.COMMITTING,
@@ -2184,20 +2177,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   private void initializeCommitters() throws Exception {
     if (!this.additionalOutputSpecs.isEmpty()) {
-      LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
+      LOG.info("Setting up committers for vertex " + logIdentifier + ", numAdditionalOutputs=" +
+          additionalOutputs.size());
       for (Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry:
           additionalOutputs.entrySet())  {
         final String outputName = entry.getKey();
         final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue();
         if (od.getControllerDescriptor() == null
             || od.getControllerDescriptor().getClassName() == null) {
-          LOG.info("Ignoring committer as none specified for output="
-              + outputName
-              + ", vertexId=" + logIdentifier);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Ignoring committer as none specified for output="
+                + outputName
+                + ", vertexId=" + logIdentifier);
+          }
           continue;
         }
         LOG.info("Instantiating committer for output=" + outputName
-            + ", vertexId=" + logIdentifier
+            + ", vertex=" + logIdentifier
             + ", committerClass=" + od.getControllerDescriptor().getClassName());
 
         dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
@@ -2214,12 +2210,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
                 .createClazzInstance(od.getControllerDescriptor().getClassName(),
                     new Class[]{OutputCommitterContext.class},
                     new Object[]{outputCommitterContext});
-            LOG.info("Invoking committer init for output=" + outputName
-                + ", vertexId=" + logIdentifier);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Invoking committer init for output=" + outputName
+                  + ", vertex=" + logIdentifier);
+            }
             outputCommitter.initialize();
             outputCommitters.put(outputName, outputCommitter);
-            LOG.info("Invoking committer setup for output=" + outputName
-                + ", vertexId=" + logIdentifier);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Invoking committer setup for output=" + outputName
+                  + ", vertex=" + logIdentifier);
+            }
             outputCommitter.setupOutput();
             return null;
           }
@@ -3913,8 +3913,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       boolean forceTransitionToKillWait = false;
       vertex.completedTaskCount++;
-      LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : "
-          + vertex.completedTaskCount);
       VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
       Task task = vertex.tasks.get(taskEvent.getTaskID());
       if (taskEvent.getState() == TaskState.SUCCEEDED) {
@@ -4210,10 +4208,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           int numEventsSent = events.size() - numPreRoutedEvents;
           if (numEventsSent > 0) {
             StringBuilder builder = new StringBuilder();
-            builder.append("Sending ").append(attemptID).append(" numEvents: ").append(numEventsSent)
-            .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
-            .append(" out of ").append(currEventCount).append(" on-demand events in vertex: ")
-            .append(getLogIdentifier());
+            builder.append("Sending ").append(attemptID).append(" ")
+                .append(numEventsSent)
+                .append(" events [").append(fromEventId).append(",").append(nextFromEventId)
+                .append(") total ").append(currEventCount).append(" ")
+                .append(getLogIdentifier());
             LOG.info(builder.toString());
           }
         }
@@ -4478,9 +4477,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     for (String inputName : inputsWithInitializers) {
       inputList.add(rootInputDescriptors.get(inputName));
     }
-    LOG.info("Vertex will initialize via inputInitializers "
-        + logIdentifier + ". Starting root input initializers: "
-        + inputsWithInitializers.size());
+    LOG.info("Starting " + inputsWithInitializers.size() + " inputInitializers for vertex " +
+        logIdentifier);
     initWaitsForRootInitializers = true;
     rootInputInitializerManager.runInputInitializers(inputList);
     // Send pending rootInputInitializerEvents
@@ -4564,6 +4562,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   @Override
   public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
+    LOG.info("Setting " + inputs.size() + " additional inputs for vertex" + this.logIdentifier);
     this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size());
     for (RootInputLeafOutputProto input : inputs) {
       addIO(input.getName());
@@ -4608,7 +4607,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   @Override
   public void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs) {
-    LOG.info("setting additional outputs for vertex " + this.vertexName);
+    LOG.info("Setting " + outputs.size() + " additional outputs for vertex " + this.logIdentifier);
     this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
     this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size());
     for (RootInputLeafOutputProto output : outputs) {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 94889a1..da2200e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -147,7 +147,7 @@ public class ContainerLauncherImpl extends AbstractService implements
 
     @SuppressWarnings("unchecked")
     public synchronized void launch(NMCommunicatorLaunchRequestEvent event) {
-      LOG.info("Launching Container with Id: " + event.getContainerId());
+      LOG.info("Launching " + event.getContainerId());
       if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
         state = ContainerState.DONE;
         sendContainerLaunchFailedMsg(event.getContainerId(),
@@ -211,8 +211,7 @@ public class ContainerLauncherImpl extends AbstractService implements
       if(this.state == ContainerState.PREP) {
         this.state = ContainerState.KILLED_BEFORE_LAUNCH;
       } else {
-        LOG.info("Sending a stop request to the NM for ContainerId: "
-            + containerID);
+        LOG.info("Stopping " + containerID);
 
         ContainerManagementProtocolProxyData proxy = null;
         try {
@@ -378,7 +377,9 @@ public class ContainerLauncherImpl extends AbstractService implements
 
     @Override
     public void run() {
-      LOG.info("Processing the event " + event.toString());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing event: " + event.toString());
+      }
 
       // Load ContainerManager tokens before creating a connection.
       // TODO: Do it only once per NodeManager.

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 6d57737..785caf7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -144,7 +144,9 @@ public class TaskSchedulerEventHandler extends AbstractService
   }
 
   public synchronized void handleEvent(AMSchedulerEvent sEvent) {
-    LOG.info("Processing the event " + sEvent.toString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing the event " + sEvent.toString());
+    }
     switch (sEvent.getType()) {
     case S_TA_LAUNCH_REQUEST:
       handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
@@ -160,7 +162,7 @@ public class TaskSchedulerEventHandler extends AbstractService
         handleTASucceeded(event);
         break;
       default:
-        throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
+        throw new TezUncheckedException("Unexpected TA_ENDED state: " + event.getState());
       }
       break;
     case S_CONTAINER_DEALLOCATE:
@@ -302,8 +304,8 @@ public class TaskSchedulerEventHandler extends AbstractService
               event);
           return;
         }
-        LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity 
-            + " but no locality information exists for it. Ignoring hint.");
+        LOG.info("No attempt for task affinity to " + taskAffinity + " for attempt "
+            + taskAttempt.getID() + " Ignoring.");
         // fall through with null hosts/racks
       } else {
         hosts = (locationHint.getHosts() != null) ? locationHint

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 73fcb3d..f35a45f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -348,7 +348,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         "Heartbeats between preemptions should be >=1");
 
     delayedContainerManager = new DelayedContainerManager();
-    LOG.info("TaskScheduler initialized with configuration: " +
+    LOG.info("YarnTaskScheduler initialized with configuration: " +
             "maxRMHeartbeatInterval: " + heartbeatIntervalMax +
             ", containerReuseEnabled: " + shouldReuseContainers +
             ", reuseRackLocal: " + reuseRackLocal +
@@ -428,8 +428,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   @Override
   public void onContainersCompleted(List<ContainerStatus> statuses) {
     if (isStopStarted.get()) {
-      for (ContainerStatus status : statuses) {
-        LOG.info("Container " + status.getContainerId() + " is completed");
+      if (LOG.isDebugEnabled()) {
+        for (ContainerStatus status : statuses) {
+          LOG.debug("Container " + status.getContainerId() + " is completed with ContainerStatus=" +
+              status);
+        }
       }
       return;
     }
@@ -450,8 +453,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           // being released
           // completion of a container we had released earlier
           // an allocated container completed. notify app
-          LOG.info("Released container completed:" + completedId +
-                   " last allocated to task: " + task);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Released container completed:" + completedId +
+                " last allocated to task: " + task);
+          }
           appContainerStatus.put(task, containerStatus);
           continue;
         }
@@ -467,9 +472,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         }
         if(task != null) {
           // completion of a container we have allocated currently
-          // an allocated container completed. notify app
-          LOG.info("Allocated container completed:" + completedId +
-                   " last allocated to task: " + task);
+          // an allocated container completed. notify app. This will cause attempt to get killed
+          LOG.info(
+              "Allocated container completed:" + completedId + " last allocated to task: " + task);
           appContainerStatus.put(task, containerStatus);
           continue;
         }
@@ -488,9 +493,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   @Override
   public void onContainersAllocated(List<Container> containers) {
     if (isStopStarted.get()) {
-      for (Container container : containers) {
-        LOG.info("Release container:" + container.getId() + ", because it is shutting down.");
-        releaseContainer(container.getId());
+      LOG.info("Ignoring container allocations because application is shutting down. Num " + 
+          containers.size());
+      if (LOG.isDebugEnabled()) {
+        for (Container container : containers) {
+          LOG.debug("Release container:" + container.getId() + ", because App is shutting down.");
+          releaseContainer(container.getId());
+        }
       }
       return;
     }
@@ -549,6 +558,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     }
 
     // Release any unassigned containers given by the RM
+    if (containers.iterator().hasNext()) {
+      LOG.info("Releasing newly assigned containers which could not be allocated");
+    }
     releaseUnassignedContainers(containers);
 
     return assignedContainers;
@@ -602,15 +614,15 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     boolean isNew = heldContainer.isNew();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Trying to assign a delayed container"
-        + ", containerId=" + heldContainer.getContainer().getId()
-        + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
-        + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
-        + ", AMState=" + state
-        + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
-        + ", taskRequestsCount=" + taskRequests.size()
-        + ", heldContainers=" + heldContainers.size()
-        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
-        + ", isNew=" + isNew);
+          + ", containerId=" + heldContainer.getContainer().getId()
+          + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
+          + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
+          + ", AMState=" + state
+          + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
+          + ", taskRequestsCount=" + taskRequests.size()
+          + ", heldContainers=" + heldContainers.size()
+          + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+          + ", isNew=" + isNew);
     }
 
     if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) {
@@ -658,7 +670,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
             + ", isNew=" + isNew);
           releaseUnassignedContainers(
-              Lists.newArrayList(heldContainer.getContainer()));        
+              Collections.singletonList((heldContainer.getContainer())));        
       } else {
         // no outstanding work and container idle timeout not expired
         if (LOG.isDebugEnabled()) {
@@ -711,7 +723,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         assignReUsedContainerWithLocation(containerToAssign,
             NODE_LOCAL_ASSIGNER, assignedContainers, true);
         if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-          LOG.info("Failed to assign tasks to delayed container using node"
+          LOG.debug("Failed to assign tasks to delayed container using node"
             + ", containerId=" + heldContainer.getContainer().getId());
         }
       }
@@ -727,7 +739,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           assignReUsedContainerWithLocation(containerToAssign,
               RACK_LOCAL_ASSIGNER, assignedContainers, false);
           if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-            LOG.info("Failed to assign tasks to delayed container using rack"
+            LOG.debug("Failed to assign tasks to delayed container using rack"
               + ", containerId=" + heldContainer.getContainer().getId());
           }
         }
@@ -743,7 +755,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
          assignReUsedContainerWithLocation(containerToAssign,
               NON_LOCAL_ASSIGNER, assignedContainers, false);
           if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
-            LOG.info("Failed to assign tasks to delayed container using non-local"
+            LOG.debug("Failed to assign tasks to delayed container using non-local"
                 + ", containerId=" + heldContainer.getContainer().getId());
           }
         }
@@ -765,10 +777,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
           && idleContainerTimeoutMin != -1) {
           LOG.info("Container's idle timeout expired. Releasing container"
-            + ", containerId=" + heldContainer.container.getId()
-            + ", containerExpiryTime="
-            + heldContainer.getContainerExpiryTime()
-            + ", idleTimeoutMin=" + idleContainerTimeoutMin);
+              + ", containerId=" + heldContainer.container.getId()
+              + ", containerExpiryTime="
+              + heldContainer.getContainerExpiryTime()
+              + ", idleTimeoutMin=" + idleContainerTimeoutMin);
           releaseUnassignedContainers(
             Lists.newArrayList(heldContainer.container));
         } else {
@@ -815,11 +827,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             if (safeToRelease && 
                 (!taskRequests.isEmpty() || !appContext.isSession())) {
               LOG.info("Releasing held container as either there are pending but "
-                + " unmatched requests or this is not a session"
-                + ", containerId=" + heldContainer.container.getId()
-                + ", pendingTasks=" + taskRequests.size()
-                + ", isSession=" + appContext.isSession()
-                + ". isNew=" + isNew);
+                  + " unmatched requests or this is not a session"
+                  + ", containerId=" + heldContainer.container.getId()
+                  + ", pendingTasks=" + taskRequests.size()
+                  + ", isSession=" + appContext.isSession()
+                  + ". isNew=" + isNew);
               releaseUnassignedContainers(
                 Lists.newArrayList(heldContainer.container));
             } else {
@@ -894,8 +906,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       // TODO this will not handle dynamic changes in resources
       totalResources = Resources.clone(getAvailableResources());
       LOG.info("App total resource memory: " + totalResources.getMemory() +
-               " cpu: " + totalResources.getVirtualCores() +
-               " taskAllocations: " + taskAllocations.size());
+          " cpu: " + totalResources.getVirtualCores() +
+          " taskAllocations: " + taskAllocations.size());
     }
 
     numHeartbeats++;
@@ -994,9 +1006,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     // See if any of the delayedContainers can be used for this task.
     delayedContainerManager.triggerScheduling(true);
     LOG.info("Allocation request for task: " + task +
-      " with request: " + request + 
-      " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") +
-      " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null"));
+          " with request: " + request +
+          " host: " + ((hosts != null && hosts.length > 0) ? hosts[0] : "null") +
+          " rack: " + ((racks != null && racks.length > 0) ? racks[0] : "null"));
   }
 
   /**
@@ -1025,8 +1037,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         LOG.info("Ignoring removal of unknown task: " + task);
         return false;
       } else {
-        LOG.info("Deallocated task: " + task + " from container: "
-            + container.getId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Deallocated task: " + task + " from container: "
+              + container.getId());
+        }
 
         if (!taskSucceeded || !shouldReuseContainers) {
           if (LOG.isDebugEnabled()) {
@@ -1046,6 +1060,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             }
             assignedContainers = assignDelayedContainer(heldContainer);
           } else {
+            // this is a non standard situation
             LOG.info("Skipping container after task deallocate as container is"
                 + " no longer running, containerId=" + container.getId());
           }
@@ -1064,8 +1079,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   public synchronized Object deallocateContainer(ContainerId containerId) {
     Object task = unAssignContainer(containerId, true);
     if(task != null) {
+      // non-standard case for the app layer to deallocate container
       LOG.info("Deallocated container: " + containerId +
-        " from task: " + task);
+          " from task: " + task);
       return task;
     }
 
@@ -1075,9 +1091,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   @Override
   public synchronized void initiateStop() {
-    LOG.info("Initiate stop to YarnTaskScheduler");
+    LOG.info("Initiating stop of YarnTaskScheduler");
     // release held containers
-    LOG.info("Release held containers");
+    LOG.info("Releasing held containers");
     isStopStarted.set(true);
     // Create a new list for containerIds to iterate, otherwise it would cause ConcurrentModificationException
     // because method releaseContainer will change heldContainers.
@@ -1090,7 +1106,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     }
 
     // remove taskRequest from AMRMClient to avoid allocating new containers in the next heartbeat
-    LOG.info("Remove all the taskRequests");
+    LOG.info("Removing all pending taskRequests");
     // Create a new list for tasks to avoid ConcurrentModificationException
     List<Object> tasks = new ArrayList<Object>(taskRequests.size());
     for (Object task : taskRequests.keySet()) {
@@ -1117,6 +1133,14 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     return (int) Math.ceil((original * percent)/100.f);
   }
   
+  private String constructPreemptionPeriodicLog(Resource freeResource) {
+    return "Allocated: " + allocatedResources +
+      " Free: " + freeResource +
+      " pendingRequests: " + taskRequests.size() +
+      " delayedContainers: " + delayedContainerManager.delayedContainers.size() +
+      " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption;
+  }
+  
   void preemptIfNeeded() {
     if (preemptionPercentage == 0) {
       // turned off
@@ -1127,10 +1151,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     synchronized (this) {
       Resource freeResources = amRmClient.getAvailableResources();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
-          " cpu:" + allocatedResources.getVirtualCores() + 
-          " delayedContainers: " + delayedContainerManager.delayedContainers.size() +
-          " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption);
+        LOG.debug(constructPreemptionPeriodicLog(freeResources));
+      } else {
+        if (numHeartbeats % 50 == 1) {
+          LOG.info(constructPreemptionPeriodicLog(freeResources));
+        }
       }
       assert freeResources.getMemory() >= 0;
   
@@ -1156,8 +1181,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       
       if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Highest pri request: " + highestPriRequest + " fits in available resources "
-              + freeResources);
+          LOG.debug(highestPriRequest + " fits in free resources");
+        } else {
+          if (numHeartbeats % 50 == 1) {
+            LOG.info(highestPriRequest + " fits in free resources");
+          }
         }
         return;
       }
@@ -1651,8 +1679,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
 
   private void releaseUnassignedContainers(Iterable<Container> containers) {
     for (Container container : containers) {
-      LOG.info("Releasing unused container: "
-          + container.getId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Releasing unused container: " + container.getId());
+      }
       releaseContainer(container.getId());
     }
   }
@@ -1721,19 +1750,17 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       Object task = getTask(assigned);
       assert task != null;
 
-      LOG.info("Assigning container to task"
-        + ", container=" + container
+      LOG.info("Assigning container to task: "
+        + "containerId=" + container.getId()
         + ", task=" + task
-        + ", containerHost=" + container.getNodeId().getHost()
+        + ", containerHost=" + container.getNodeId()
+        + ", containerPriority= " + container.getPriority()
+        + ", containerResources=" + container.getResource()
         + ", localityMatchType=" + locality
         + ", matchedLocation=" + matchedLocation
         + ", honorLocalityFlags=" + honorLocalityFlags
-        + ", reusedContainer="
-        + containerAssignments.containsKey(container.getId())
-        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
-        + ", containerResourceMemory=" + container.getResource().getMemory()
-        + ", containerResourceVCores="
-        + container.getResource().getVirtualCores());
+        + ", reusedContainer=" + containerAssignments.containsKey(container.getId())
+        + ", delayedContainers=" + delayedContainerManager.delayedContainers.size());
 
       assignContainer(task, container, assigned);
     }
@@ -1921,6 +1948,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
                 heldContainers.get(delayedContainer.getContainer().getId())) {
               assignedContainers = assignDelayedContainer(delayedContainer);
             } else {
+              // non standard scenario
               LOG.info("Skipping delayed container as container is no longer"
                   + " running, containerId="
                   + delayedContainer.getContainer().getId());
@@ -1975,9 +2003,10 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           HeldContainer delayedContainer = iter.next();
           if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) {
             // this container is no longer held by us
+            // non standard scenario
             LOG.info("AssignAll - Skipping delayed container as container is no longer"
-                + " running, containerId="
-                + delayedContainer.getContainer().getId());
+                  + " running, containerId="
+                  + delayedContainer.getContainer().getId());
             iter.remove();
           }
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 470fa56..11b5006 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -113,8 +113,10 @@ public class AMContainerHelpers {
       // correctly, even though they may not be used by all tasks which will run
       // on this container.
 
-      LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
-          + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding #" + credentials.numberOfTokens() + " tokens and #"
+            + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container in common CLC");
+      }
       containerCredentials.addAll(credentials);
 
       DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
@@ -123,7 +125,9 @@ public class AMContainerHelpers {
           containerTokens_dob.getLength());
 
       // Add shuffle token
-      LOG.info("Putting shuffle token in serviceData");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Putting shuffle token in serviceData in common CLC");
+      }
       serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
           TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9b90752..8b6e861 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -397,9 +397,11 @@ public class AMContainerImpl implements AMContainer {
         // TODO Can't set state to COMPLETED. Add a default error state.
       }
       if (oldState != getState()) {
-        LOG.info("AMContainer " + this.containerId + " transitioned from "
-            + oldState + " to " + getState()
-            + " via event " + event.getType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("AMContainer " + this.containerId + " transitioned from "
+              + oldState + " to " + getState()
+              + " via event " + event.getType());
+        }
       }
     } finally {
       writeLock.unlock();
@@ -450,8 +452,10 @@ public class AMContainerImpl implements AMContainer {
       // task is not told to die since the TAL does not know about the container.
       container.registerWithTAListener();
       container.sendStartRequestToNM(clc);
-      LOG.info("Sending Launch Request for Container with id: " +
-          container.container.getId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending Launch Request for Container with id: " +
+            container.container.getId());
+      }
     }
   }
 
@@ -509,7 +513,7 @@ public class AMContainerImpl implements AMContainer {
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
       container.deAllocate();
-      LOG.info(
+      LOG.warn(
           "Unexpected event type: " + cEvent.getType() + " while in state: " +
               container.getState() + ". Event: " + cEvent);
 
@@ -573,8 +577,6 @@ public class AMContainerImpl implements AMContainer {
         }
       }
 
-      LOG.info("Assigned taskAttempt + [" + container.currentAttempt +
-          "] to container: [" + container.getContainerId() + "]");
       AMContainerTask amContainerTask = new AMContainerTask(
           event.getRemoteTaskSpec(), container.additionalLocalResources,
           container.credentialsChanged ? container.credentials : null, container.credentialsChanged,

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 102cbe9..a067cee 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -132,6 +132,8 @@ public class AMNodeTracker extends AbstractService implements
       AMNode amNode = nodeMap.get(nodeId);
       if (amNode == null) {
         LOG.info("Ignoring RM Health Update for unknown node: " + nodeId);
+        // This implies the node exists on the cluster, but is not running a container for
+        // this application
       } else {
         amNode.handle(rEvent);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 9e275a2..e17a4d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -47,8 +47,6 @@ public class HistoryEventHandler extends CompositeService {
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    LOG.info("Initializing HistoryEventHandler");
-
     this.recoveryEnabled = context.getAMConf().getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
         TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
 
@@ -56,6 +54,10 @@ public class HistoryEventHandler extends CompositeService {
         TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
         TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
 
+    LOG.info("Initializing HistoryEventHandler with"
+        + "recoveryEnabled=" + recoveryEnabled
+        + ", historyServiceClassName=" + historyServiceClassName);
+
     historyLoggingService =
         ReflectionUtils.createClazzInstance(historyServiceClassName);
     historyLoggingService.setAppContext(context);
@@ -66,11 +68,11 @@ public class HistoryEventHandler extends CompositeService {
       addService(recoveryService);
     }
     super.serviceInit(conf);
+
   }
 
   @Override
   public void serviceStart() throws Exception {
-    LOG.info("Starting HistoryEventHandler");
     super.serviceStart();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 9f24151..7d83db2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -177,6 +177,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
 
   @Override
   public String toString() {
+    String counterStr = "";
+    if (state != TaskAttemptState.SUCCEEDED) {
+      counterStr = ", counters=" + ( tezCounters == null ? "null" :
+        tezCounters.toString()
+        .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+    }
     return "vertexName=" + vertexName
         + ", taskAttemptId=" + taskAttemptId
         + ", creationTime=" + creationTime
@@ -187,11 +193,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", status=" + state.name()
         + ", errorEnum=" + (error != null ? error.name() : "")
         + ", diagnostics=" + diagnostics
-        + ", lastDataEventSourceTA=" + 
-              ((dataEvents==null) ? 0:dataEvents.size())
-        + ", counters=" + (tezCounters == null ? "null" :
-          tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+        + counterStr;
   }
 
   public TezTaskAttemptID getTaskAttemptID() {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index a58b49e..71d4419 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -110,9 +110,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
         + ", taskAttemptId=" + taskAttemptId
         + ", startTime=" + launchTime
         + ", containerId=" + containerId
-        + ", nodeId=" + nodeId
-        + ", inProgressLogs=" + inProgressLogsUrl
-        + ", completedLogs=" + completedLogsUrl;
+        + ", nodeId=" + nodeId;
   }
 
   public TezTaskAttemptID getTaskAttemptID() {

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
index 8852e02..4372d8e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -148,7 +148,9 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService {
     if (loggingDisabled) {
       return;
     }
-    LOG.info("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+    }
     try {
       try {
         JSONObject eventJson = HistoryEventJsonConversion.convertToJson(event.getHistoryEvent());

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index d870645..2fe0e6d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -106,7 +106,6 @@ public class RecoveryService extends AbstractService {
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    LOG.info("Initializing RecoveryService");
     recoveryPath = appContext.getCurrentRecoveryDir();
     recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf);
     bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
@@ -120,11 +119,16 @@ public class RecoveryService extends AbstractService {
     drainEventsFlag = conf.getBoolean(
         TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
         TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT);
+
+    LOG.info("RecoveryService initialized with "
+      + "recoveryPath=" + recoveryPath
+      + ", bufferSize(bytes)=" + bufferSize
+      + ", flushInterval(s)=" + flushInterval
+      + ", maxUnflushedEvents=" + maxUnflushedEvents);
   }
 
   @Override
   public void serviceStart() {
-    LOG.info("Starting RecoveryService");
     lastFlushTime = appContext.getClock().getTime();
     eventHandlingThread = new Thread(new Runnable() {
       @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-dag/src/main/resources/tez-container-log4j.properties
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties
index 7a2aeab..c53994e 100644
--- a/tez-dag/src/main/resources/tez-container-log4j.properties
+++ b/tez-dag/src/main/resources/tez-container-log4j.properties
@@ -28,7 +28,7 @@ log4j.appender.CLA=org.apache.tez.common.TezContainerLogAppender
 log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
 
 log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}: %m%n
+log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t]|| %c{2} %m%n:
 
 #
 # Event Counter Appender

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 69237d4..1b66c8e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -117,8 +117,9 @@ public class MROutputCommitter extends OutputCommitter {
     if (jobConf.getBoolean("mapred.reducer.new-api", false)
         || jobConf.getBoolean("mapred.mapper.new-api", false))  {
       newApiCommitter = true;
-      LOG.info("Using mapred newApiCommitter.");
     }
+    LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() +
+        " using " + (newApiCommitter ? "new" : "old") + "mapred API");
 
     if (newApiCommitter) {
       TaskAttemptID taskAttemptID = new TaskAttemptID(

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index dbc7748..b93e4ba 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -69,35 +69,30 @@ public class MRInputAMSplitGenerator extends InputInitializer {
 
   @Override
   public List<Event> initialize() throws Exception {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     MRInputUserPayloadProto userPayloadProto = MRInputHelpers
         .parseMRInputPayload(getContext().getInputUserPayload());
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "
           + sw.elapsedMillis());
     }
-    if (LOG.isDebugEnabled()) {
-      sw.reset().start();
-    }
+    sw.reset().start();
     Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
         .getConfigurationBytes());
     
     sendSerializedEvents = conf.getBoolean(
         MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD,
         MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD_DEFAULT);
-    LOG.info("Emitting serialized splits: " + sendSerializedEvents);
+
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
+      LOG.debug("Emitting serialized splits: " + sendSerializedEvents + " for input " +
+          getContext().getInputName());
       LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis());
     }
 
-    if (LOG.isDebugEnabled()) {
-      sw.reset().start();
-    }
+    sw.reset().start();
 
     int totalResource = getContext().getTotalAvailableResource().getMemory();
     int taskResource = getContext().getVertexTaskResource().getMemory();
@@ -107,24 +102,26 @@ public class MRInputAMSplitGenerator extends InputInitializer {
 
     int numTasks = (int)((totalResource*waves)/taskResource);
 
+
+
+    boolean groupSplits = userPayloadProto.getGroupingEnabled();
     LOG.info("Input " + getContext().getInputName() + " asking for " + numTasks
         + " tasks. Headroom: " + totalResource + " Task Resource: "
-        + taskResource + " waves: " + waves);
+        + taskResource + " waves: " + waves + ", groupingEnabled: " + groupSplits);
 
     // Read all credentials into the credentials instance stored in JobConf.
     JobConf jobConf = new JobConf(conf);
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
 
     InputSplitInfoMem inputSplitInfo = null;
-    boolean groupSplits = userPayloadProto.getGroupingEnabled();
+
     if (groupSplits) {
-      LOG.info("Grouping input splits");
       inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, true, numTasks);
     } else {
       inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
     }
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("Time to create splits to mem: " + sw.elapsedMillis());
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index e6b70d2..28d108e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -69,14 +69,11 @@ public class MRInputSplitDistributor extends InputInitializer {
 
   @Override
   public List<Event> initialize() throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
+    Stopwatch sw = new Stopwatch().start();
     MRInputUserPayloadProto userPayloadProto = MRInputHelpers
         .parseMRInputPayload(getContext().getInputUserPayload());
+    sw.stop();
     if (LOG.isDebugEnabled()) {
-      sw.stop();
       LOG.debug("Time to parse MRInput payload into prot: "
           + sw.elapsedMillis());  
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 7f5e0e3..30e4a8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -284,21 +284,27 @@ public class MRInputHelpers {
     InputSplitInfoMem splitInfoMem = null;
     JobConf jobConf = new JobConf(conf);
     if (jobConf.getUseNewMapper()) {
-      LOG.info("Generating mapreduce api input splits");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generating mapreduce api input splits");
+      }
       Job job = Job.getInstance(conf);
       org.apache.hadoop.mapreduce.InputSplit[] splits =
           generateNewSplits(job, groupSplits, targetTasks);
       splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
           splits.length, job.getCredentials(), job.getConfiguration());
     } else {
-      LOG.info("Generating mapred api input splits");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generating mapred api input splits");
+      }
       org.apache.hadoop.mapred.InputSplit[] splits =
           generateOldSplits(jobConf, groupSplits, targetTasks);
       splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
           splits.length, jobConf.getCredentials(), jobConf);
     }
-    LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
-        + splitInfoMem.getSplitsProto().getSerializedSize());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
+          + splitInfoMem.getSplitsProto().getSerializedSize());
+    }
     return splitInfoMem;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
index 720af50..80828d4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
@@ -51,11 +51,13 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
     if (useNewApi) {
       oldPartitioner = null;
       if (partitions > 1) {
+        Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>> clazz =
+            (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
+                .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
+                    org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
+        LOG.info("Using newApi, MRpartitionerClass=" + clazz.getName());
         newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
-            .newInstance(
-                (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
-                    .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
-                        org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class), conf);
+            .newInstance(clazz, conf);
       } else {
         newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
           @Override
@@ -67,10 +69,12 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
     } else {
       newPartitioner = null;
       if (partitions > 1) {
-        oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+        Class<? extends org.apache.hadoop.mapred.Partitioner> clazz =
             (Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass(
-                "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class),
-            new JobConf(conf));
+                "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class);
+        LOG.info("Using oldApi, MRpartitionerClass=" + clazz.getName());
+        oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+            clazz, new JobConf(conf));
       } else {
         oldPartitioner = new org.apache.hadoop.mapred.Partitioner() {
           @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index d0e935f..6ea21e2 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -95,7 +95,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
           + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED + " set to false");
       return;
     }
-    LOG.info("Initializing ATSService");
 
     if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
       YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
@@ -124,7 +123,12 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     }
     sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
 
-    LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+    LOG.info("Initializing " + ATSHistoryLoggingService.class.getSimpleName() + " with "
+      + "maxEventsPerBatch=" + maxEventsPerBatch
+      + ", maxPollingTime(ms)=" + maxPollingTimeMillis
+      + ", waitTimeForShutdown(ms)=" + maxTimeToWaitOnShutdown
+      + ", TimelineACLManagerClass=" + atsHistoryACLManagerClassName);
+
     try {
       historyACLPolicyManager = ReflectionUtils.createClazzInstance(
           atsHistoryACLManagerClassName);
@@ -146,7 +150,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     if (!historyLoggingEnabled || timelineClient == null) {
       return;
     }
-    LOG.info("Starting ATSService");
     timelineClient.start();
 
     eventHandlingThread = new Thread(new Runnable() {