You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/09/22 20:04:54 UTC

[2/2] tez git commit: TEZ_2844. Backport TEZ-2775 to branch-0.7. Improve and consolidate logging in Runtime components. (sseth)

TEZ_2844. Backport TEZ-2775 to branch-0.7. Improve and consolidate
logging in Runtime components. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cd80d9ab
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cd80d9ab
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cd80d9ab

Branch: refs/heads/branch-0.7
Commit: cd80d9ab592f8c116031087da01ac3f6a84a86d2
Parents: c7a946c
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Sep 22 11:04:25 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Sep 22 11:04:25 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../resources/tez-container-log4j.properties    |   2 +-
 .../hadoop/mapred/split/TezGroupedSplit.java    |  12 +++
 .../hadoop/mapreduce/split/TezGroupedSplit.java |  12 +++
 .../org/apache/tez/mapreduce/input/MRInput.java |  36 ++++---
 .../tez/mapreduce/input/MRInputLegacy.java      |   8 +-
 .../tez/mapreduce/input/MultiMRInput.java       |  22 ++--
 .../apache/tez/mapreduce/output/MROutput.java   |  13 ++-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  39 ++++++-
 .../apache/tez/runtime/task/TaskReporter.java   |   2 +-
 .../library/common/InputAttemptIdentifier.java  |   2 +-
 .../runtime/library/common/TezRuntimeUtils.java |   8 +-
 .../runtime/library/common/ValuesIterator.java  |   2 -
 .../runtime/library/common/shuffle/Fetcher.java | 103 +++++++++++++------
 .../library/common/shuffle/HttpConnection.java  |   4 +-
 .../common/shuffle/ShuffleEventHandler.java     |   1 +
 .../library/common/shuffle/ShuffleUtils.java    |  40 +++++--
 .../impl/ShuffleInputEventHandlerImpl.java      |  41 ++++++--
 .../common/shuffle/impl/ShuffleManager.java     |  82 +++++++++------
 .../impl/SimpleFetchedInputAllocator.java       |  26 +++--
 .../orderedgrouped/FetcherOrderedGrouped.java   |  86 +++++++++++-----
 .../shuffle/orderedgrouped/MergeManager.java    |  52 +++++-----
 .../common/shuffle/orderedgrouped/Shuffle.java  |  36 ++++---
 .../ShuffleInputEventHandlerOrderedGrouped.java |  42 ++++++--
 .../orderedgrouped/ShuffleScheduler.java        |  56 ++++++----
 .../common/sort/impl/ExternalSorter.java        |  26 +++--
 .../common/sort/impl/PipelinedSorter.java       | 101 +++++++++++-------
 .../common/sort/impl/dflt/DefaultSorter.java    |  69 +++++++------
 .../writers/UnorderedPartitionedKVWriter.java   |  49 +++++----
 .../library/input/OrderedGroupedKVInput.java    |  20 ++--
 .../runtime/library/input/UnorderedKVInput.java |  16 ++-
 .../output/OrderedPartitionedKVOutput.java      |   6 +-
 .../library/output/UnorderedKVOutput.java       |   6 +-
 .../output/UnorderedPartitionedKVOutput.java    |   4 +-
 .../impl/TestSimpleFetchedInputAllocator.java   |   2 +-
 ...tShuffleInputEventHandlerOrderedGrouped.java |   3 +-
 .../sort/impl/dflt/TestDefaultSorter.java       |  10 +-
 37 files changed, 697 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b6204c..9fc3555 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES
+  TEZ-2844. Backport TEZ-2775 to branch-0.7. Improve and consolidate logging in Runtime components.
   TEZ-2843. Tez UI: Show error if in progress fails due to AM not reachable
   TEZ-2842. Tez UI: Update Tez App details page while in-progress
   TEZ-2834. Make Tez preemption resilient to incorrect free resource reported

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-dag/src/main/resources/tez-container-log4j.properties
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties
index c53994e..4620a78 100644
--- a/tez-dag/src/main/resources/tez-container-log4j.properties
+++ b/tez-dag/src/main/resources/tez-container-log4j.properties
@@ -28,7 +28,7 @@ log4j.appender.CLA=org.apache.tez.common.TezContainerLogAppender
 log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
 
 log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t]|| %c{2} %m%n:
+log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t] |%c{2}|: %m%n
 
 #
 # Event Counter Appender

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
index 4f3a0f2..a9893aa 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -175,4 +176,15 @@ public class TezGroupedSplit implements InputSplit, Configurable {
   public String getRack() {
     return rack;
   }
+
+  @Override
+  public String toString() {
+    return "TezGroupedSplit{" +
+        "wrappedSplits=" + wrappedSplits +
+        ", wrappedInputFormatName='" + wrappedInputFormatName + '\'' +
+        ", locations=" + Arrays.toString(locations) +
+        ", rack='" + rack + '\'' +
+        ", length=" + length +
+        '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
index f85bbcd..430d2ec 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -185,4 +186,15 @@ public class TezGroupedSplit extends InputSplit
   public String getRack() {
     return rack;
   }
+
+  @Override
+  public String toString() {
+    return "TezGroupedSplit{" +
+        "wrappedSplits=" + wrappedSplits +
+        ", wrappedInputFormatName='" + wrappedInputFormatName + '\'' +
+        ", locations=" + Arrays.toString(locations) +
+        ", rack='" + rack + '\'' +
+        ", length=" + length +
+        '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 70365cd..93161cb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -437,8 +437,9 @@ public class MRInput extends MRInputBase {
     getContext().inputIsReady();
     this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
         MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
-    LOG.info("Using New mapreduce API: " + useNewApi
-        + ", split information via event: " + splitInfoViaEvents);
+    LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi +
+        ", split via event=" + splitInfoViaEvents + ", numPhysicalInputs=" +
+        getNumPhysicalInputs());
     initializeInternal();
     return null;
   }
@@ -447,7 +448,6 @@ public class MRInput extends MRInputBase {
   public void start() {
     Preconditions.checkState(getNumPhysicalInputs() == 0 || getNumPhysicalInputs() == 1,
         "Expecting 0 or 1 physical input for MRInput");
-    LOG.info("MRInput setup to received {} events", getNumPhysicalInputs());
   }
 
   @Private
@@ -490,7 +490,7 @@ public class MRInput extends MRInputBase {
     } finally {
       rrLock.unlock();
     }
-    LOG.info("Initialzed MRInput: " + getContext().getSourceVertexName());
+    LOG.info("Initialized MRInput: " + getContext().getSourceVertexName());
   }
 
   /**
@@ -588,7 +588,9 @@ public class MRInput extends MRInputBase {
     rrLock.lock();
     try {
       initFromEventInternal(event);
-      LOG.info("Notifying on RecordReader Initialized");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getContext().getSourceVertexName() + " notifying on RecordReader initialized");
+      }
       rrInited.signal();
     } finally {
       rrLock.unlock();
@@ -599,7 +601,9 @@ public class MRInput extends MRInputBase {
     assert rrLock.getHoldCount() == 1;
     rrLock.lock();
     try {
-      LOG.info("Awaiting RecordReader initialization");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getContext().getSourceVertexName() + " awaiting RecordReader initialization");
+      }
       rrInited.await();
     } catch (Exception e) {
       throw new IOException(
@@ -621,22 +625,30 @@ public class MRInput extends MRInputBase {
   }
   
   private void initFromEventInternal(InputDataInformationEvent initEvent) throws IOException {
-    LOG.info("Initializing RecordReader from event");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(getContext().getSourceVertexName() + " initializing RecordReader from event");
+    }
     Preconditions.checkState(initEvent != null, "InitEvent must be specified");
     MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload()));
     Object split = null;
     if (useNewApi) {
       split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, jobConf);
-      LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: "
-          + split);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
+            split.getClass().getName() + ", NewSplit: "
+            + split);
+      }
 
     } else {
       split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, jobConf);
-      LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: "
-          + split);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
+            split.getClass().getName() + ", OldSplit: "
+            + split);
+      }
     }
     mrReader.setSplit(split);
-    LOG.info("Initialized RecordReader from event");
+    LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event");
   }
 
   private static class MRInputHelpersInternal extends MRInputHelpers {

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index d825d53..e83c36a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -73,7 +73,7 @@ public class MRInputLegacy extends MRInput {
 
   @Private
   protected void initializeInternal() throws IOException {
-    LOG.info("MRInputLegacy deferring initialization");
+    LOG.info(getContext().getSourceVertexName() + " MRInputLegacy deferring initialization");
   }
   
   @Private
@@ -130,7 +130,11 @@ public class MRInputLegacy extends MRInput {
       }
       if (splitInfoViaEvents && !inited) {
         if (initEvent == null) {
-          LOG.info("Awaiting init event before initializing record reader");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(getContext().getSourceVertexName() +
+                " awaiting init event before initializing record reader");
+          }
+
           try {
             eventCondition.await();
           } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index 44d9c96..4a792dc 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -108,8 +108,8 @@ public class MultiMRInput extends MRInputBase {
   @Override
   public List<Event> initialize() throws IOException {
     super.initialize();
-    LOG.info("Using New mapreduce API: " + useNewApi + ", numPhysicalInputs: "
-        + getNumPhysicalInputs());
+    LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi +
+        ", numPhysicalInputs=" + getNumPhysicalInputs());
     if (getNumPhysicalInputs() == 0) {
       getContext().inputIsReady();
     }
@@ -164,7 +164,9 @@ public class MultiMRInput extends MRInputBase {
 
   private MRReader initFromEvent(InputDataInformationEvent event) throws IOException {
     Preconditions.checkState(event != null, "Event must be specified");
-    LOG.info("Initializing Reader: " + eventCount.get());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(getContext().getSourceVertexName() + " initializing Reader: " + eventCount.get());
+    }
     MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
     Object split = null;
     MRReader reader = null;
@@ -176,17 +178,21 @@ public class MultiMRInput extends MRInputBase {
           .getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext()
           .getApplicationId().getId(), getContext().getTaskIndex(), getContext()
           .getTaskAttemptNumber());
-      LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: "
-          + split);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
+            split.getClass().getName() + ", NewSplit: " + split);
+      }
 
     } else {
       split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf);
       reader = new MRReaderMapred(localJobConf, (org.apache.hadoop.mapred.InputSplit) split,
           getContext().getCounters(), inputRecordCounter);
-      LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: "
-          + split);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
+            split.getClass().getName() + ", OldSplit: " + split);
+      }
     }
-    LOG.info("Initialized RecordReader from event");
+    LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event");
     return reader;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index d19f707..7136482 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -343,7 +343,6 @@ public class MROutput extends AbstractLogicalOutput {
 
   @Override
   public List<Event> initialize() throws IOException, InterruptedException {
-    LOG.info("Initializing Simple Output");
     getContext().requestInitialMemory(0l, null); //mandatory call
     taskNumberFormat.setMinimumIntegerDigits(5);
     taskNumberFormat.setGroupingUsed(false);
@@ -380,6 +379,8 @@ public class MROutput extends AbstractLogicalOutput {
       }
     }
 
+    String outputFormatClassName;
+
     outputRecordCounter = getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);    
 
     if (useNewApi) {
@@ -388,6 +389,7 @@ public class MROutput extends AbstractLogicalOutput {
         newOutputFormat =
             org.apache.hadoop.util.ReflectionUtils.newInstance(
                 newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
+        outputFormatClassName = newOutputFormat.getClass().getName();
       } catch (ClassNotFoundException cnfe) {
         throw new IOException(cnfe);
       }
@@ -404,6 +406,7 @@ public class MROutput extends AbstractLogicalOutput {
               jobConf, taskAttemptId,
               new MRTaskReporter(getContext()));
       oldOutputFormat = jobConf.getOutputFormat();
+      outputFormatClassName = oldOutputFormat.getClass().getName();
 
       FileSystem fs = FileSystem.get(jobConf);
       String finalName = getOutputName();
@@ -414,8 +417,9 @@ public class MROutput extends AbstractLogicalOutput {
     }
     initCommitter(jobConf, useNewApi);
 
-    LOG.info("Initialized Simple Output"
-        + ", using_new_api: " + useNewApi);
+    LOG.info(getContext().getDestinationVertexName() + ": "
+        + "outputFormat=" + outputFormatClassName
+        + ", using newmapreduce API=" + useNewApi);
     return null;
   }
 
@@ -517,6 +521,7 @@ public class MROutput extends AbstractLogicalOutput {
   @Override
   public synchronized List<Event> close() throws IOException {
     flush();
+    LOG.info(getContext().getDestinationVertexName() + " closed");
     long outputRecords = getContext().getCounters()
         .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
     getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
@@ -534,7 +539,6 @@ public class MROutput extends AbstractLogicalOutput {
       return;
     }
 
-    LOG.info("Flushing Simple Output");
     if (useNewApi) {
       try {
         newRecordWriter.close(newApiTaskAttemptContext);
@@ -544,7 +548,6 @@ public class MROutput extends AbstractLogicalOutput {
     } else {
       oldRecordWriter.close(null);
     }
-    LOG.info("Flushed Simple Output");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 92035e1..c61bb4e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -404,6 +404,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
     @Override
     protected Void callInternal() throws Exception {
+      String oldThreadName = Thread.currentThread().getName();
+      try {
+        Thread.currentThread().setName(oldThreadName + "{" + inputSpec.getSourceVertexName() + "}");
+        return _callInternal();
+      } finally {
+        Thread.currentThread().setName(oldThreadName);
+      }
+    }
+
+    protected Void _callInternal() throws Exception {
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("Initializing Input using InputSpec: " + inputSpec);
       }
@@ -437,6 +448,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
     @Override
     protected Void callInternal() throws Exception {
+      String oldThreadName = Thread.currentThread().getName();
+      try {
+        Thread.currentThread().setName(oldThreadName + " Start: {" + srcVertexName + "}");
+        return _callInternal();
+      } finally {
+        Thread.currentThread().setName(oldThreadName);
+      }
+    }
+
+    protected Void _callInternal() throws Exception {
+      Thread.currentThread().setName("InitializerStart {" + srcVertexName + "}");
       if (LOG.isDebugEnabled()) {
         LOG.debug("Starting Input with src edge: " + srcVertexName);
       }
@@ -459,6 +481,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
     @Override
     protected Void callInternal() throws Exception {
+      String oldThreadName = Thread.currentThread().getName();
+      try {
+        Thread.currentThread().setName(oldThreadName + "{" + outputSpec.getDestinationVertexName() + "}");
+        return _callInternal();
+      } finally {
+        Thread.currentThread().setName(oldThreadName);
+      }
+    }
+
+    protected Void _callInternal() throws Exception {
+      Thread.currentThread().setName("Initializer {" + outputSpec.getDestinationVertexName() + "}");
       if (LOG.isDebugEnabled()) {
         LOG.debug("Initializing Output using OutputSpec: " + outputSpec);
       }
@@ -727,8 +760,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       }
     });
 
-    eventRouterThread.setName("TezTaskEventRouter["
-        + taskSpec.getTaskAttemptID().toString() + "]");
+    eventRouterThread.setName("TezTaskEventRouter{"
+        + taskSpec.getTaskAttemptID().toString() + "}");
     eventRouterThread.start();
   }
 
@@ -756,7 +789,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   public void cleanup() throws InterruptedException {
-    LOG.info("Final Counters : " + getCounters().toShortString());
+    LOG.info("Final Counters for " + taskSpec.getTaskAttemptID() + ": " + getCounters().toShortString());
     setTaskDone();
     if (eventRouterThread != null) {
       eventRouterThread.interrupt();

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index d9a7786..bf93ce3 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -278,7 +278,7 @@ public class TaskReporter {
       int numEventsReceived = 0;
       if (task.isTaskDone() || task.hadFatalError()) {
         if (response.getEvents() != null && !response.getEvents().isEmpty()) {
-          LOG.info("Current task already complete, Ignoring all event in"
+          LOG.info("Current task already complete, Ignoring all events in"
               + " heartbeat response, eventCount=" + response.getEvents().size());
         }
       } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index 59fb638..d70942c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -146,6 +146,6 @@ public class InputAttemptIdentifier {
   public String toString() {
     return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
         + ", attemptNumber=" + attemptNumber + ", pathComponent="
-        + pathComponent + ", fetchTypeInfo=" + fetchTypeInfo + ", spillEventId=" + spillEventId  +"]";
+        + pathComponent + ", spillType=" + fetchTypeInfo.ordinal() + ", spillId=" + spillEventId  +"]";
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 69436ba..819423f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -64,7 +64,9 @@ public class TezRuntimeUtils {
     if (className == null) {
       return null;
     }
-    LOG.info("Using Combiner class: " + className);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using Combiner class: " + className);
+    }
     try {
       clazz = (Class<? extends Combiner>) conf.getClassByName(className);
     } catch (ClassNotFoundException e) {
@@ -105,7 +107,9 @@ public class TezRuntimeUtils {
           + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS), e);
     }
 
-    LOG.info("Using partitioner class: " + clazz.getName());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using partitioner class: " + clazz.getName());
+    }
 
     Partitioner partitioner = null;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index 24f9f8a..f4da742 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -45,7 +45,6 @@ import com.google.common.base.Preconditions;
 
 @Private
 public class ValuesIterator<KEY,VALUE> {
-  private static final Logger LOG = LoggerFactory.getLogger(ValuesIterator.class.getName());
   protected TezRawKeyValueIterator in; //input iterator
   private KEY key;               // current key
   private KEY nextKey;
@@ -82,7 +81,6 @@ public class ValuesIterator<KEY,VALUE> {
     this.keyDeserializer.open(keyIn);
     this.valDeserializer = serializationFactory.getDeserializer(valClass);
     this.valDeserializer.open(this.valueIn);
-    LOG.info("keyDeserializer=" + keyDeserializer + "; valueDeserializer=" + valDeserializer);
   }
 
   TezRawKeyValueIterator getRawIterator() { return in; }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 0dee90c..be68cc1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -202,8 +202,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
           fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
         }
       } else {
-        LOG.info("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length +
-            " inputs since the fetcher has already been stopped");
+        if (isDebugEnabled) {
+          LOG.debug("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length +
+              " inputs since the fetcher has already been stopped");
+        }
       }
     }
 
@@ -409,9 +411,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       // indirectly penalizing the host
       InputAttemptIdentifier[] failedFetches = null;
       if (isShutDown.get()) {
-        LOG.info(
-            "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." +
-                e.getClass().getName() + ", Message: " + e.getMessage());
+        if (isDebugEnabled) {
+          LOG.debug(
+              "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." +
+                  e.getClass().getName() + ", Message: " + e.getMessage());
+        }
       } else {
         failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
       }
@@ -420,7 +424,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     if (isShutDown.get()) {
       // shutdown would have no effect if in the process of establishing the connection.
       shutdownInternal();
-      LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
+      if (isDebugEnabled) {
+        LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning");
+      }
       return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false);
     }
 
@@ -434,9 +440,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       // with the first map, typically lost map. So, penalize only that map
       // and add the rest
       if (isShutDown.get()) {
-        LOG.info(
-            "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." +
-                e.getClass().getName() + ", Message: " + e.getMessage());
+        if (isDebugEnabled) {
+          LOG.debug(
+              "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." +
+                  e.getClass().getName() + ", Message: " + e.getMessage());
+        }
       } else {
         InputAttemptIdentifier firstAttempt = attempts.get(0);
         LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
@@ -462,7 +470,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     if (isShutDown.get()) {
       // shutdown would have no effect if in the process of establishing the connection.
       shutdownInternal();
-      LOG.info("Detected fetcher has been shutdown after opening stream. Returning");
+      if (isDebugEnabled) {
+        LOG.debug("Detected fetcher has been shutdown after opening stream. Returning");
+      }
       return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false);
     }
     // After this point, closing the stream and connection, should cause a
@@ -477,7 +487,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     while (!remaining.isEmpty() && failedInputs == null) {
       if (isShutDown.get()) {
         shutdownInternal(true);
-        LOG.info("Fetcher already shutdown. Aborting queued fetches for " + remaining.size() + " inputs");
+        if (isDebugEnabled) {
+          LOG.debug("Fetcher already shutdown. Aborting queued fetches for " + remaining.size() + " inputs");
+        }
         return new HostFetchResult(new FetchResult(host, port, partition, remaining), null,
             false);
       }
@@ -487,7 +499,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         //clean up connection
         shutdownInternal(true);
         if (isShutDown.get()) {
-          LOG.info("Fetcher already shutdown. Aborting reconnection and queued fetches for " + remaining.size() + " inputs");
+          if (isDebugEnabled) {
+            LOG.debug("Fetcher already shutdown. Aborting reconnection and queued fetches for " + remaining.size() + " inputs");
+          }
           return new HostFetchResult(new FetchResult(host, port, partition, remaining), null,
               false);
         }
@@ -501,8 +515,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     }
 
     if (isShutDown.get() && failedInputs != null && failedInputs.length > 0) {
-      LOG.info("Fetcher already shutdown. Not reporting fetch failures for: " +
-          failedInputs.length + " failed inputs");
+      if (isDebugEnabled) {
+        LOG.debug("Fetcher already shutdown. Not reporting fetch failures for: " +
+            failedInputs.length + " failed inputs");
+      }
       failedInputs = null;
     }
     return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs,
@@ -520,7 +536,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     Iterator<InputAttemptIdentifier> iterator = remaining.iterator();
     while (iterator.hasNext()) {
       if (isShutDown.get()) {
-        LOG.info("Already shutdown. Skipping fetch for " + remaining.size() + " inputs");
+        if (isDebugEnabled) {
+          LOG.debug("Already shutdown. Skipping fetch for " + remaining.size() + " inputs");
+        }
         break;
       }
       InputAttemptIdentifier srcAttemptId = iterator.next();
@@ -545,9 +563,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
               @Override
               public void freeResources(FetchedInput fetchedInput) {}
             });
-        LOG.info("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
-            + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
-            + " to " + fetchedInput.getType());
+        if (isDebugEnabled) {
+          LOG.debug("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
+              + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
+              + " to " + fetchedInput.getType());
+        }
 
         long endTime = System.currentTimeMillis();
         fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(),
@@ -556,9 +576,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       } catch (IOException e) {
         cleanupFetchedInput(fetchedInput);
         if (isShutDown.get()) {
-          LOG.info(
-              "Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId + " from host " +
-                  host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
+          if (isDebugEnabled) {
+            LOG.debug(
+                "Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId +
+                    " from host " +
+                    host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
+          }
           break;
         }
         LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host + "(local fetch)",
@@ -569,8 +592,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     InputAttemptIdentifier[] failedFetches = null;
     if (failMissing && remaining.size() > 0) {
       if (isShutDown.get()) {
-        LOG.info("Already shutdown, not reporting fetch failures for: " + remaining.size() +
+        if (isDebugEnabled) {
+          LOG.debug("Already shutdown, not reporting fetch failures for: " + remaining.size() +
             " remaining inputs");
+        }
       } else {
         failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
       }
@@ -623,6 +648,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
   public void shutdown() {
     if (!isShutDown.getAndSet(true)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Shutting down fetcher for host: " + host);
+      }
       shutdownInternal();
     }
   }
@@ -643,7 +671,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       } catch (IOException e) {
         LOG.info("Exception while shutting down fetcher on " + logIdentifier + " : "
             + e.getMessage());
-        if (LOG.isDebugEnabled()) {
+        if (isDebugEnabled) {
           LOG.debug(StringUtils.EMPTY, e);
         }
       }
@@ -678,7 +706,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
           // Don't know which one was bad, so consider all of them as bad
           return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
         } else {
-          LOG.info("Already shutdown. Ignoring badId error with message: " + e.getMessage());
+          if (isDebugEnabled) {
+            LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
+          }
           return null;
         }
       }
@@ -694,12 +724,14 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
           assert (srcAttemptId != null);
           return new InputAttemptIdentifier[]{srcAttemptId};
         } else {
-          LOG.info("Already shutdown. Ignoring verification failure.");
+          if (isDebugEnabled) {
+            LOG.debug("Already shutdown. Ignoring verification failure.");
+          }
           return null;
         }
       }
 
-      if (LOG.isDebugEnabled()) {
+      if (isDebugEnabled) {
         LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength
             + ", decomp len: " + decompressedLength);
       }
@@ -723,10 +755,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       // }
 
       // Go!
-      LOG.info("fetcher" + " about to shuffle output of srcAttempt "
-          + fetchedInput.getInputAttemptIdentifier() + " decomp: "
-          + decompressedLength + " len: " + compressedLength + " to "
-          + fetchedInput.getType());
+      if (isDebugEnabled) {
+        LOG.debug("fetcher" + " about to shuffle output of srcAttempt "
+            + fetchedInput.getInputAttemptIdentifier() + " decomp: "
+            + decompressedLength + " len: " + compressedLength + " to "
+            + fetchedInput.getType());
+      }
 
       if (fetchedInput.getType() == Type.MEMORY) {
         ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
@@ -735,7 +769,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
           fetchedInput.getInputAttemptIdentifier().toString());
       } else if (fetchedInput.getType() == Type.DISK) {
         ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
-          (host +":" +port), input, compressedLength, LOG,
+          (host +":" +port), input, compressedLength, decompressedLength, LOG,
           fetchedInput.getInputAttemptIdentifier().toString());
       } else {
         throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
@@ -765,8 +799,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     } catch (IOException ioe) {
       if (isShutDown.get()) {
         cleanupFetchedInput(fetchedInput);
-        LOG.info("Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName() +
-            ", Message: " + ioe.getMessage());
+        if (isDebugEnabled) {
+          LOG.debug(
+              "Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName() +
+                  ", Message: " + ioe.getMessage());
+        }
         return null;
       }
       if (shouldRetry(srcAttemptId, ioe)) {

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
index ad6ed19..17e6e90 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
@@ -274,7 +274,9 @@ public class HttpConnection {
     stopWatch.reset().start();
     try {
       if (input != null) {
-        LOG.info("Closing input on " + logIdentifier);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Closing input on " + logIdentifier);
+        }
         input.close();
       }
       if (httpConnParams.keepAlive && connectionSucceeed) {

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
index ff66158..da7c944 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java
@@ -25,4 +25,5 @@ import org.apache.tez.runtime.api.Event;
 
 public interface ShuffleEventHandler {
   public void handleEvents(List<Event> events) throws IOException;
+  public void logProgress(boolean updateOnClose);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 46489ed..1873485 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -107,12 +107,16 @@ public class ShuffleUtils {
       Logger LOG, String identifier) throws IOException {
     try {
       IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec,
-        ifileReadAhead, ifileReadAheadLength);
+          ifileReadAhead, ifileReadAheadLength);
       // metrics.inputBytes(shuffleData.length);
-      LOG.info("Read " + shuffleData.length + " bytes from input for "
-          + identifier);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Read " + shuffleData.length + " bytes from input for "
+            + identifier);
+      }
     } catch (IOException ioe) {
       // Close the streams
+      LOG.info("Failed to read data to memory for " + identifier + ". len=" + compressedLength +
+          ", decomp=" + decompressedLength + ". ExceptionMessage=" + ioe.getMessage());
       ioCleanup(input);
       // Re-throw
       throw ioe;
@@ -120,7 +124,7 @@ public class ShuffleUtils {
   }
   
   public static void shuffleToDisk(OutputStream output, String hostIdentifier,
-      InputStream input, long compressedLength, Logger LOG, String identifier)
+      InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier)
       throws IOException {
     // Copy data to local-disk
     long bytesLeft = compressedLength;
@@ -138,12 +142,16 @@ public class ShuffleUtils {
         // metrics.inputBytes(n);
       }
 
-      LOG.info("Read " + (compressedLength - bytesLeft)
-          + " bytes from input for " + identifier);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Read " + (compressedLength - bytesLeft)
+            + " bytes from input for " + identifier);
+      }
 
       output.close();
     } catch (IOException ioe) {
       // Close the streams
+      LOG.info("Failed to read data to disk for " + identifier + ". len=" + compressedLength +
+          ", decomp=" + decompressedLength + ". ExceptionMessage=" + ioe.getMessage());
       ioCleanup(input, output);
       // Re-throw
       throw ioe;
@@ -468,10 +476,26 @@ public class ShuffleUtils {
     }
     log.info(
         "Completed fetch for attempt: "
-            + srcAttemptIdentifier + " to " + outputType +
-            ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
+            + toShortString(srcAttemptIdentifier)
+            +" to " + outputType +
+            ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed +
             ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
             MBPS_FORMAT.get().format(rate) + " MB/s");
   }
+
+  private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    sb.append(inputAttemptIdentifier.getInputIdentifier().getInputIndex());
+    sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber());
+    sb.append(", ").append(inputAttemptIdentifier.getPathComponent());
+    if (inputAttemptIdentifier.getFetchTypeInfo()
+        != InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) {
+      sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal());
+      sb.append(", ").append(inputAttemptIdentifier.getSpillEventId());
+    }
+    sb.append("}");
+    return sb.toString();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 61b3e3a..8fb1568 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -22,6 +22,7 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
 import java.io.IOException;
 import java.util.BitSet;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.protobuf.ByteString;
 
@@ -32,20 +33,15 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputIdentifier;
-import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
-import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
-import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -57,16 +53,24 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleInputEventHandlerImpl.class);
   
   private final ShuffleManager shuffleManager;
+  //TODO: unused. Consider removing later?
   private final FetchedInputAllocator inputAllocator;
   private final CompressionCodec codec;
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
   private final boolean useSharedInputs;
+  private final InputContext inputContext;
+
+  private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
+  private final AtomicInteger numDmeEvents = new AtomicInteger(0);
+  private final AtomicInteger numObsoletionEvents = new AtomicInteger(0);
+  private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);
 
   public ShuffleInputEventHandlerImpl(InputContext inputContext,
                                       ShuffleManager shuffleManager,
                                       FetchedInputAllocator inputAllocator, CompressionCodec codec,
                                       boolean ifileReadAhead, int ifileReadAheadLength) {
+    this.inputContext = inputContext;
     this.shuffleManager = shuffleManager;
     this.inputAllocator = inputAllocator;
     this.codec = codec;
@@ -86,13 +90,29 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   
   private void handleEvent(Event event) throws IOException {
     if (event instanceof DataMovementEvent) {
+      numDmeEvents.incrementAndGet();
       processDataMovementEvent((DataMovementEvent)event);
       shuffleManager.updateEventReceivedTime();
     } else if (event instanceof InputFailedEvent) {
-      processInputFailedEvent((InputFailedEvent)event);
+      numObsoletionEvents.incrementAndGet();
+      processInputFailedEvent((InputFailedEvent) event);
     } else {
       throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
     }
+    if (numDmeEvents.get() + numObsoletionEvents.get() > nextToLogEventCount.get()) {
+      logProgress(false);
+      // Log every 50 events seen.
+      nextToLogEventCount.addAndGet(50);
+    }
+  }
+
+  @Override
+  public void logProgress(boolean updateOnClose) {
+    LOG.info(inputContext.getSourceVertexName() + ": "
+        + "numDmeEventsSeen=" + numDmeEvents.get()
+        + ", numDmeEventsSeenWithNoData=" + numDmeEventsNoData.get()
+        + ", numObsoletionEventsSeen=" + numObsoletionEvents.get()
+        + (updateOnClose == true ? ", updateOnClose" : ""));
   }
 
   private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
@@ -104,9 +124,11 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     }
     int srcIndex = dme.getSourceIndex();
-    LOG.info("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex()
-        + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils
-        .stringify(shufflePayload));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex()
+          + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils
+          .stringify(shufflePayload));
+    }
 
     if (shufflePayload.hasEmptyPartitions()) {
       byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
@@ -119,6 +141,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
           LOG.debug("Source partition: " + srcIndex + " did not generate any data. SrcAttempt: ["
               + srcAttemptIdentifier + "]. Not fetching.");
         }
+        numDmeEventsNoData.incrementAndGet();
         shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
         return;
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 2cfcc06..99fc18a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -228,11 +228,11 @@ public class ShuffleManager implements FetcherCallback {
     ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
         numFetchers,
         new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build());
+            .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
     
     ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build());
+        .setDaemon(true).setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}").build());
     this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
     this.schedulerCallable = new RunShuffleCallable(conf);
     
@@ -270,7 +270,7 @@ public class ShuffleManager implements FetcherCallback {
 
     shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, ShuffleEventInfo>();
 
-    LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
+    LOG.info(srcNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec="
         + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
         + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
         + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
@@ -317,7 +317,7 @@ public class ShuffleManager implements FetcherCallback {
         }
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug("NumCompletedInputs: " + numCompletedInputs);
+          LOG.debug(srcNameTrimmed + ": " + "NumCompletedInputs: " + numCompletedInputs);
         }
         if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
           lock.lock();
@@ -330,20 +330,20 @@ public class ShuffleManager implements FetcherCallback {
                 inputHost = pendingHosts.take();
               } catch (InterruptedException e) {
                 if (isShutdown.get()) {
-                  LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+                  LOG.info(srcNameTrimmed + ": " + "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                   break;
                 } else {
                   throw e;
                 }
               }
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Processing pending host: " + inputHost.toDetailedString());
+                LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + inputHost.toDetailedString());
               }
               if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
                 Fetcher fetcher = constructFetcherForHost(inputHost, conf);
                 runningFetchers.add(fetcher);
                 if (isShutdown.get()) {
-                  LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+                  LOG.info(srcNameTrimmed + ": " + "hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                 }
                 ListenableFuture<FetchResult> future = fetcherExecutor
                     .submit(fetcher);
@@ -353,7 +353,7 @@ public class ShuffleManager implements FetcherCallback {
                 }
               } else {
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Skipping host: " + inputHost.getIdentifier()
+                  LOG.debug(srcNameTrimmed + ": " + "Skipping host: " + inputHost.getIdentifier()
                       + " since it has no inputs to process");
                 }
               }
@@ -364,8 +364,7 @@ public class ShuffleManager implements FetcherCallback {
         }
       }
       shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
-      LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
-      // TODO NEWTEZ Maybe clean up inputs.
+      LOG.info(srcNameTrimmed + ": " + "Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
       if (!fetcherExecutor.isShutdown()) {
         fetcherExecutor.shutdownNow();
       }
@@ -450,9 +449,11 @@ public class ShuffleManager implements FetcherCallback {
     }
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
         inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
-    LOG.info("Created Fetcher for host: " + inputHost.getHost()
-        + ", info: " + inputHost.getAdditionalInfo()
-        + ", with inputs: " + pendingInputsForHost);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created Fetcher for host: " + inputHost.getHost()
+          + ", info: " + inputHost.getAdditionalInfo()
+          + ", with inputs: " + pendingInputsForHost);
+    }
     return fetcherBuilder.build();
   }
   
@@ -471,7 +472,7 @@ public class ShuffleManager implements FetcherCallback {
       }
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
+      LOG.debug(srcNameTrimmed + ": " + "Adding input: " + srcAttemptIdentifier + ", to host: " + host);
     }
 
     if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
@@ -501,7 +502,9 @@ public class ShuffleManager implements FetcherCallback {
   public void addCompletedInputWithNoData(
       InputAttemptIdentifier srcAttemptIdentifier) {
     InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-    LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+    }
     
     if (!completedInputSet.contains(inputIdentifier)) {
       synchronized (completedInputSet) {
@@ -574,8 +577,10 @@ public class ShuffleManager implements FetcherCallback {
     }
 
     boolean isDone() {
-      LOG.info("finalEventId=" + finalEventId + ", eventsProcessed cardinality=" +
-          eventsProcessed.cardinality());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("finalEventId=" + finalEventId + ", eventsProcessed cardinality=" +
+            eventsProcessed.cardinality());
+      }
       return ((finalEventId != -1) && (finalEventId + 1) == eventsProcessed.cardinality());
     }
 
@@ -631,10 +636,10 @@ public class ShuffleManager implements FetcherCallback {
           lock.lock();
           try {
             totalBytesShuffledTillNow += fetchedBytes;
+            logProgress();
           } finally {
             lock.unlock();
           }
-          logProgress();
         }
       }
     }
@@ -751,7 +756,7 @@ public class ShuffleManager implements FetcherCallback {
       InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
     // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
     // For now, reporting immediately.
-    LOG.info("Fetch failed for src: " + srcAttemptIdentifier
+    LOG.info(srcNameTrimmed + ": " + "Fetch failed for src: " + srcAttemptIdentifier
         + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
         + connectFailed);
     failedShufflesCounter.increment(1);
@@ -885,16 +890,23 @@ public class ShuffleManager implements FetcherCallback {
     }
   }
 
+  private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0);
+
   private void logProgress() {
-    double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
     int inputsDone = numCompletedInputs.get();
-    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
 
-    double transferRate = mbs / secsSinceStart;
-    LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills.get() + ") of " +
-        numInputs +
-        ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
-        + mbpsFormat.format(transferRate) + " MB/s)");
+    if (inputsDone > nextProgressLineEventCount.get() || inputsDone == numInputs) {
+      nextProgressLineEventCount.addAndGet(50);
+      double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
+      long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+      double transferRate = mbs / secsSinceStart;
+      LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills.get() + ") of " +
+          numInputs +
+          ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
+          + mbpsFormat.format(transferRate) + " MB/s)");
+
+    }
   }
 
 
@@ -902,15 +914,17 @@ public class ShuffleManager implements FetcherCallback {
 
     @Override
     public void onSuccess(Void result) {
-      LOG.info("Scheduler thread completed");
+      LOG.info(srcNameTrimmed + ": " + "Scheduler thread completed");
     }
 
     @Override
     public void onFailure(Throwable t) {
       if (isShutdown.get()) {
-        LOG.info("Already shutdown. Ignoring error: " + t);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring error: " + t);
+        }
       } else {
-        LOG.error("Scheduler failed with error: ", t);
+        LOG.error(srcNameTrimmed + ": " + "Scheduler failed with error: ", t);
         inputContext.fatalError(t, "Shuffle Scheduler Failed");
       }
     }
@@ -939,7 +953,9 @@ public class ShuffleManager implements FetcherCallback {
     public void onSuccess(FetchResult result) {
       fetcher.shutdown();
       if (isShutdown.get()) {
-        LOG.info("Already shutdown. Ignoring event from fetcher");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring event from fetcher");
+        }
       } else {
         Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
         if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
@@ -960,9 +976,11 @@ public class ShuffleManager implements FetcherCallback {
       // Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down.
       fetcher.shutdown();
       if (isShutdown.get()) {
-        LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring error from fetcher: " + t);
+        }
       } else {
-        LOG.error("Fetcher failed with error: ", t);
+        LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error: ", t);
         shuffleError = t;
         inputContext.fatalError(t, "Fetch failed");
         doBookKeepingForFetcherComplete();

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
index 31a8651..604d213 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
@@ -60,11 +60,14 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
 
   private final long maxAvailableTaskMemory;
   private final long initialMemoryAvailable;
+
+  private final String srcNameTrimmed;
   
   private volatile long usedMemory = 0;
 
-  public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf,
+  public SimpleFetchedInputAllocator(String srcNameTrimmed, String uniqueIdentifier, Configuration conf,
       long maxTaskAvailableMemory, long memoryAvailable) {
+    this.srcNameTrimmed = srcNameTrimmed;
     this.conf = conf;    
     this.maxAvailableTaskMemory = maxTaskAvailableMemory;
     this.initialMemoryAvailable = memoryAvailable;
@@ -92,8 +95,6 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
       this.memoryLimit = initialMemoryAvailable;
     }
 
-    LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
-    
     final float singleShuffleMemoryLimitPercent = conf.getFloat(
         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
@@ -107,9 +108,13 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
     //TODO: cap it to MAX_VALUE until MemoryFetchedInput can handle > 2 GB
     this.maxSingleShuffleLimit = (long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent),
         Integer.MAX_VALUE);
-    
-    LOG.info("SimpleInputManager -> " + "MemoryLimit: " + 
-        this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
+
+    LOG.info(srcNameTrimmed + ": "
+        + "RequestedMemory=" + memReq
+        + ", AssignedMemorty=" + this.memoryLimit
+        + ", maxSingleShuffleLimit=" + this.maxSingleShuffleLimit
+    );
+
   }
 
   @Private
@@ -137,7 +142,10 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
           fileNameAllocator);
     } else {
       this.usedMemory += actualSize;
-      LOG.info("Used memory after allocating " + actualSize  + " : " + usedMemory);
+      if (LOG.isDebugEnabled()) {
+        LOG.info(srcNameTrimmed + ": " + "Used memory after allocating " + actualSize + " : " +
+            usedMemory);
+      }
       return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
     }
   }
@@ -196,7 +204,9 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
 
   private synchronized void unreserve(long size) {
     this.usedMemory -= size;
-    LOG.info("Used memory after freeing " + size  + " : " + usedMemory);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(srcNameTrimmed + ": " + "Used memory after freeing " + size  + " : " + usedMemory);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 67b5aa0..0ba37dd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -90,6 +90,8 @@ class FetcherOrderedGrouped extends Thread {
   volatile HttpURLConnection connection;
   volatile DataInputStream input;
 
+  volatile MapHost assignedHost = null;
+
   HttpConnection httpConnection;
   HttpConnectionParams httpConnectionParams;
 
@@ -139,15 +141,15 @@ class FetcherOrderedGrouped extends Thread {
 
     this.localDiskFetchEnabled = localDiskFetchEnabled;
 
-    this.logIdentifier = "fetcher [" + TezUtilsInternal
-        .cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id;
+    this.logIdentifier = "fetcher {" + TezUtilsInternal
+        .cleanVertexName(inputContext.getSourceVertexName()) + "} #" + id;
     setName(logIdentifier);
     setDaemon(true);
   }  
 
   @VisibleForTesting
   protected void fetchNext() throws InterruptedException, IOException {
-    MapHost host = null;
+    assignedHost = null;
     try {
       // If merge is on, block
       merger.waitForInMemoryMerge();
@@ -156,20 +158,20 @@ class FetcherOrderedGrouped extends Thread {
       merger.waitForShuffleToMergeMemory();
 
       // Get a host to shuffle from
-      host = scheduler.getHost();
+      assignedHost = scheduler.getHost();
       metrics.threadBusy();
 
-      String hostPort = host.getHostIdentifier();
+      String hostPort = assignedHost.getHostIdentifier();
       if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) {
-        setupLocalDiskFetch(host);
+        setupLocalDiskFetch(assignedHost);
       } else {
         // Shuffle
-        copyFromHost(host);
+        copyFromHost(assignedHost);
       }
     } finally {
       cleanupCurrentConnection(false);
-      if (host != null) {
-        scheduler.freeHost(host);
+      if (assignedHost != null) {
+        scheduler.freeHost(assignedHost);
         metrics.threadFree();
       }
     }
@@ -191,6 +193,9 @@ class FetcherOrderedGrouped extends Thread {
 
   public void shutDown() throws InterruptedException {
     this.stopped = true;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Fetcher stopped for host " + assignedHost);
+    }
     interrupt();
     cleanupCurrentConnection(true);
     try {
@@ -276,14 +281,19 @@ class FetcherOrderedGrouped extends Thread {
           // Setup connection again if disconnected
           cleanupCurrentConnection(true);
           if (stopped) {
-            LOG.info("Not re-establishing connection since Fetcher has been stopped");
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Not re-establishing connection since Fetcher has been stopped");
+            }
             return;
           }
           // Connect with retry
           if (!setupConnection(host, new LinkedList<InputAttemptIdentifier>(remaining))) {
             if (stopped) {
               cleanupCurrentConnection(true);
-              LOG.info("Not reporting connection re-establishment failure since fetcher is stopped");
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "Not reporting connection re-establishment failure since fetcher is stopped");
+              }
               return;
             }
             failedTasks = new InputAttemptIdentifier[] {getNextRemainingAttempt()};
@@ -294,8 +304,10 @@ class FetcherOrderedGrouped extends Thread {
 
       if (failedTasks != null && failedTasks.length > 0) {
         if (stopped) {
-          LOG.info("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks) +
-              " since Fetcher has been stopped");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks) +
+                " since Fetcher has been stopped");
+          }
         } else {
           LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks));
           for (InputAttemptIdentifier left : failedTasks) {
@@ -328,7 +340,9 @@ class FetcherOrderedGrouped extends Thread {
       connectSucceeded = httpConnection.connect();
 
       if (stopped) {
-        LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning");
+        }
         return false;
       }
       input = httpConnection.getInputStream();
@@ -336,7 +350,9 @@ class FetcherOrderedGrouped extends Thread {
       return true;
     } catch (IOException ie) {
       if (stopped) {
-        LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Not reporting fetch failure, since an Exception was caught after shutdown");
+        }
         return false;
       }
       ioErrs.increment(1);
@@ -402,7 +418,9 @@ class FetcherOrderedGrouped extends Thread {
                 InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.forReduce);
             return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
           } else {
-            LOG.info("Already shutdown. Ignoring invalid map id error");
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Already shutdown. Ignoring invalid map id error");
+            }
             return EMPTY_ATTEMPT_ID_ARRAY;
           }
         }
@@ -419,8 +437,10 @@ class FetcherOrderedGrouped extends Thread {
           // the remaining because we dont know where to start reading from. YARN-1773
           return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
         } else {
-          LOG.info("Already shutdown. Ignoring invalid map id error. Exception: " +
-              e.getClass().getName() + ", Message: " + e.getMessage());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Already shutdown. Ignoring invalid map id error. Exception: " +
+                e.getClass().getName() + ", Message: " + e.getMessage());
+          }
           return EMPTY_ATTEMPT_ID_ARRAY;
         }
       }
@@ -436,7 +456,9 @@ class FetcherOrderedGrouped extends Thread {
           assert (srcAttemptId != null);
           return new InputAttemptIdentifier[]{srcAttemptId};
         } else {
-          LOG.info("Already stopped. Ignoring verification failure.");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Already stopped. Ignoring verification failure.");
+          }
           return EMPTY_ATTEMPT_ID_ARRAY;
         }
       }
@@ -455,7 +477,9 @@ class FetcherOrderedGrouped extends Thread {
           ioErrs.increment(1);
           scheduler.reportLocalError(e);
         } else {
-          LOG.info("Already stopped. Ignoring error from merger.reserve");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Already stopped. Ignoring error from merger.reserve");
+          }
         }
         return EMPTY_ATTEMPT_ID_ARRAY;
       }
@@ -468,16 +492,19 @@ class FetcherOrderedGrouped extends Thread {
       } 
       
       // Go!
-      LOG.info("fetcher#" + id + " about to shuffle output of map " + 
-               mapOutput.getAttemptIdentifier() + " decomp: " +
-               decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("fetcher#" + id + " about to shuffle output of map " +
+            mapOutput.getAttemptIdentifier() + " decomp: " +
+            decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
+      }
+
       if (mapOutput.getType() == Type.MEMORY) {
         ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
           (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
           ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
       } else if (mapOutput.getType() == Type.DISK) {
         ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
-          input, compressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
+          input, compressedLength, decompressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
       } else {
         throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
             mapOutput.getType());
@@ -496,8 +523,10 @@ class FetcherOrderedGrouped extends Thread {
       return null;
     } catch (IOException ioe) {
       if (stopped) {
-        LOG.info("Not reporting fetch failure for exception during data copy: ["
-            + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Not reporting fetch failure for exception during data copy: ["
+              + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+        }
         cleanupCurrentConnection(true);
         if (mapOutput != null) {
           mapOutput.abort(); // Release resources
@@ -666,7 +695,10 @@ class FetcherOrderedGrouped extends Thread {
             LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
                 host.getHostIdentifier(), e);
           } else {
-            LOG.info("Ignoring fetch error during local disk copy since fetcher has already been stopped");
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Ignoring fetch error during local disk copy since fetcher has already been stopped");
+            }
             return;
           }
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 2e6ebd9..e25c064 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -58,14 +58,13 @@ import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
-import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
 import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
 
-
 /**
  * Usage. Create instance. setInitialMemoryAvailable(long), configureAndStart()
  *
@@ -219,10 +218,15 @@ public class MergeManager {
     } else {
       this.postMergeMemLimit = maxRedBuffer;
     }
-    
-    LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
-        + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + ". Updated to: ShuffleMem="
-        + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          inputContext.getSourceVertexName() + ": " + "InitialRequest: ShuffleMem=" + memLimit +
+              ", postMergeMem=" + maxRedBuffer
+              + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable +
+              ". Updated to: ShuffleMem="
+              + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
+    }
 
     this.ioSortFactor = 
         conf.getInt(
@@ -252,10 +256,11 @@ public class MergeManager {
                conf.getFloat(
                    TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 
                    TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
-    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
+    LOG.info(inputContext.getSourceVertexName() + ": MergerManager: memoryLimit=" + memoryLimit + ", " +
              "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
              "mergeThreshold=" + mergeThreshold + ", " + 
              "ioSortFactor=" + ioSortFactor + ", " +
+             "postMergeMem=" + postMergeMemLimit + ", " +
              "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
     
     if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
@@ -310,8 +315,6 @@ public class MergeManager {
       long memLimit = conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
           (long)(maxAvailableTaskMemory * maxInMemCopyUse));
       
-      LOG.info("Initial Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse);
-
       float maxRedPer = conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
           TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
       if (maxRedPer > 1.0 || maxRedPer < 0.0) {
@@ -319,7 +322,9 @@ public class MergeManager {
       }
       long maxRedBuffer = (long) (maxAvailableTaskMemory * maxRedPer);
 
-      LOG.info("Initial Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer);
+    LOG.info("Initial Memory required for SHUFFLE_BUFFER=" + memLimit +
+        " based on INPUT_BUFFER_FACTOR=" + maxInMemCopyUse + ",  for final merged output=" +
+        maxRedBuffer + ", using factor: " + maxRedPer);
 
       long reqMem = Math.max(maxRedBuffer, memLimit);
       return reqMem;
@@ -373,9 +378,11 @@ public class MergeManager {
                                              int fetcher
                                              ) throws IOException {
     if (!canShuffleToMemory(requestedSize)) {
-      LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + 
-               " is greater than maxSingleShuffleLimit (" + 
-               maxSingleShuffleLimit + ")");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize +
+            " is greater than maxSingleShuffleLimit (" +
+            maxSingleShuffleLimit + ")");
+      }
       return MapOutput.createDiskMapOutput(srcAttemptIdentifier, this, compressedLength, conf,
           fetcher, true, mapOutputFile);
     }
@@ -437,9 +444,9 @@ public class MergeManager {
   public synchronized void closeInMemoryFile(MapOutput mapOutput) { 
     inMemoryMapOutputs.add(mapOutput);
     LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
-        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
-        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ", mapOutput=" +
-        mapOutput);
+          + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+          + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ", mapOutput=" +
+          mapOutput);
 
     commitMemory+= mapOutput.getSize();
 
@@ -461,7 +468,7 @@ public class MergeManager {
   private void startMemToDiskMerge() {
     synchronized (inMemoryMerger) {
       if (!inMemoryMerger.isInProgress()) {
-        LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+        LOG.info(inputContext.getSourceVertexName() + ": " + "Starting inMemoryMerger's merge since commitMemory=" +
             commitMemory + " > mergeThreshold=" + mergeThreshold +
             ". Current usedMemory=" + usedMemory);
         inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
@@ -473,7 +480,7 @@ public class MergeManager {
   
   public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
     inMemoryMergedMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + 
+    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
              ", inMemoryMergedMapOutputs.size() -> " + 
              inMemoryMergedMapOutputs.size());
   }
@@ -573,7 +580,7 @@ public class MergeManager {
       Writer writer = 
         new InMemoryWriter(mergedMapOutputs.getArrayStream());
 
-      LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
+      LOG.info(inputContext.getSourceVertexName() + ": " + "Initiating Memory-to-Memory merge with " + noInMemorySegments +
                " segments of total-size: " + mergeOutputSize);
 
       // Nothing will be materialized to disk because the sort factor is being
@@ -590,7 +597,7 @@ public class MergeManager {
       TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
       writer.close();
 
-      LOG.info(inputContext.getUniqueIdentifier() +  
+      LOG.info(inputContext.getSourceVertexName() +
                " Memory-to-Memory merge of the " + noInMemorySegments +
                " files in-memory complete.");
 
@@ -642,7 +649,6 @@ public class MergeManager {
       Path outputPath = mapOutputFile.getInputFileForWrite(
           srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(),
           mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
-      LOG.info("Patch..InMemoryMerger outputPath: " + outputPath);
 
       Writer writer = null;
       long outFileLen = 0;
@@ -801,7 +807,7 @@ public class MergeManager {
       final long outputLen = localFS.getFileStatus(outputPath).getLen();
       closeOnDiskFile(new FileChunk(outputPath, 0, outputLen));
 
-      LOG.info(inputContext.getUniqueIdentifier() +
+      LOG.info(inputContext.getSourceVertexName() +
           " Finished merging " + inputs.size() + 
           " map output files on disk of total-size " + 
           approxOutputSize + "." +