You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/12 21:27:54 UTC

tez git commit: TEZ-1904. Fix findbugs warnings in tez-runtime-library module. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master c8bae3649 -> fda4c0bdc


TEZ-1904. Fix findbugs warnings in tez-runtime-library module. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fda4c0bd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fda4c0bd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fda4c0bd

Branch: refs/heads/master
Commit: fda4c0bdcfc1ddecf2872326bd1da50dfb1a9fc7
Parents: c8bae36
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jan 12 12:27:39 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jan 12 12:27:39 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 tez-runtime-library/findbugs-exclude.xml        | 86 ++++++++++++++++++++
 .../vertexmanager/InputReadyVertexManager.java  |  8 +-
 .../vertexmanager/ShuffleVertexManager.java     |  9 +-
 .../library/api/TezRuntimeConfiguration.java    |  2 +-
 .../tez/runtime/library/common/Constants.java   |  2 +-
 .../common/security/SecureShuffleUtils.java     | 26 ++----
 .../runtime/library/common/shuffle/Fetcher.java |  8 +-
 .../library/common/shuffle/ShuffleUtils.java    |  2 +-
 .../common/shuffle/impl/ShuffleManager.java     |  2 +-
 .../shuffle/orderedgrouped/InMemoryReader.java  | 12 ++-
 .../shuffle/orderedgrouped/MergeManager.java    |  4 +-
 .../common/shuffle/orderedgrouped/Shuffle.java  | 25 +++---
 .../orderedgrouped/ShuffleScheduler.java        |  2 +-
 .../runtime/library/common/sort/impl/IFile.java | 18 ++--
 .../common/sort/impl/IFileInputStream.java      |  7 +-
 .../common/sort/impl/PipelinedSorter.java       | 23 +++---
 .../library/common/sort/impl/TezMerger.java     | 20 ++---
 .../common/sort/impl/dflt/DefaultSorter.java    | 10 +--
 .../writers/UnorderedPartitionedKVWriter.java   |  2 +-
 .../library/input/OrderedGroupedKVInput.java    |  4 +-
 .../runtime/library/input/UnorderedKVInput.java |  4 +-
 .../library/processor/SleepProcessor.java       |  6 +-
 23 files changed, 178 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e16a2e9..70a827c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1904. Fix findbugs warnings in tez-runtime-library module.
   TEZ-1903. Fix findbugs warnings in tez-runtime-internal module.
   TEZ-1896. Move the default heartbeat timeout and checkinterval to TezConfiguration.
   TEZ-1923. FetcherOrderedGrouped gets into infinite loop due to memory pressure.

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 5b11308..45c194c 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -13,4 +13,90 @@
 -->
 <FindBugsFilter>
 
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler$Penalty"/>
+    <Method name="compareTo" params="java.lang.Object" returns="int"/>
+    <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/>
+  </Match>
+
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SpanIterator"/>
+    <Method name="compareTo" params="org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SpanIterator" returns="int"/>
+    <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/>
+  </Match>
+
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.comparator.TezBytesComparator"/>
+    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE"/>
+  </Match>
+
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput$MapOutputComparator"/>
+    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE"/>
+  </Match>
+
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader"/>
+    <Method name="&lt;init&gt;"/>
+    <Field name="buffer"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
+
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.shuffle.Fetcher"/>
+    <Method name="shutdownInternal" params="boolean" returns="void"/><Field name="isShutDown"/>
+    <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
+  </Match>
+
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.comparator.TezBytesComparator"/>
+    <Method name="getProxy" params="org.apache.hadoop.io.BytesWritable" returns="int"/>
+    <Bug pattern="SF_SWITCH_FALLTHROUGH"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/>
+    <Method name="&lt;init&gt;"/>
+    <Bug pattern="SC_START_IN_CTOR"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.Constants"/>
+    <Field name="MERGED_OUTPUT_PREFIX"/>
+    <Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
+  </Match>
+
+  <!-- TODO This needs more looking into -->
+  <Match>
+    <Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/>
+    <Field name="kvindex"/>
+    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto"/>
+    <Field name="PARSER"/>
+    <Bug pattern="MS_SHOULD_BE_FINAL"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto"/>
+    <Field name="unknownFields"/>
+    <Bug pattern="SE_BAD_FIELD"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.runtime\.library\.shuffle\.impl\.ShuffleUserPayloads\$.*Proto\$Builder"/>
+    <Method name="maybeForceBuilderInitialization"/>
+    <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
+  </Match>
+
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 8f30276..11185ee 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -53,7 +53,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     super(context);
   }
 
-  class SourceVertexInfo {
+  static class SourceVertexInfo {
     EdgeProperty edgeProperty;
     int numTasks;
     int numFinishedTasks;
@@ -142,7 +142,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     SourceVertexInfo srcInfo = srcVertexInfo.get(vertex);
     if (srcInfo.taskIsFinished[taskId.intValue()] == null) {
       // not a duplicate completion
-      srcInfo.taskIsFinished[taskId.intValue()] = new Boolean(true);
+      srcInfo.taskIsFinished[taskId.intValue()] = Boolean.valueOf(true);
       srcInfo.numFinishedTasks++;
       if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.ONE_TO_ONE) {
         oneToOneSrcTasksDoneCount[taskId.intValue()]++;
@@ -181,7 +181,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
       tasksToStart = Lists.newArrayListWithCapacity(numTasks);
       for (int i=0; i<numTasks; ++i) {
         taskIsStarted[i] = true;
-        tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
+        tasksToStart.add(new TaskWithLocationHint(Integer.valueOf(i), null));
       }
     } else {
       // start only the ready 1-1 tasks
@@ -196,7 +196,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
           LOG.info("Starting task " + i + " for vertex: "
               + getContext().getVertexName() + " with location: "
               + ((locationHint != null) ? locationHint.getAffinitizedTask() : "null"));
-          tasksToStart.add(new TaskWithLocationHint(new Integer(i), locationHint));
+          tasksToStart.add(new TaskWithLocationHint(Integer.valueOf(i), locationHint));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 05f94c5..a993449 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -142,7 +142,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   int bipartiteSources = 0;
   long completedSourceTasksOutputSize = 0;
 
-  class SourceVertexInfo {
+  static class SourceVertexInfo {
     EdgeProperty edgeProperty;
     boolean vertexIsConfigured;
     BitSet finishedTaskSet;
@@ -227,9 +227,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       int targetIndex = 
           sourceTaskIndex * partitionRange 
           + sourceIndex % partitionRange;
-      
-      destinationTaskAndInputIndices.put(new Integer(destinationTaskIndex),
-          Collections.singletonList(new Integer(targetIndex)));
+
+      destinationTaskAndInputIndices.put(destinationTaskIndex, Collections.singletonList(targetIndex));
     }
     
     @Override
@@ -394,7 +393,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   void updatePendingTasks() {
     pendingTasks.clear();
     for (int i=0; i<getContext().getVertexNumTasks(getContext().getVertexName()); ++i) {
-      pendingTasks.add(new Integer(i));
+      pendingTasks.add(i);
     }
     totalTasksToSchedule = pendingTasks.size();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 3c0f11c..11b33f4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -94,7 +94,7 @@ public class TezRuntimeConfiguration {
 
   public static final String TEZ_RUNTIME_SORT_SPILL_PERCENT = TEZ_RUNTIME_PREFIX +
       "sort.spill.percent";
-  public static float TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT = 0.8f;
+  public static final float TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT = 0.8f;
 
 
   public static final String TEZ_RUNTIME_IO_SORT_MB = TEZ_RUNTIME_PREFIX + "io.sort.mb";

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
index 2a5c795..d56d86c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
@@ -31,7 +31,7 @@ public class Constants {
   public static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
 
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-  public static String MERGED_OUTPUT_PREFIX = ".merged";
+  public static final String MERGED_OUTPUT_PREFIX = ".merged";
 
   public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
index b1424c0..f36d41d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
@@ -18,13 +18,12 @@
 package org.apache.tez.runtime.library.common.security;
 
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.PrintStream;
 import java.net.URL;
 
 import javax.crypto.SecretKey;
 
+import com.google.common.base.Charsets;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -47,7 +46,7 @@ public class SecureShuffleUtils {
    * @param msg
    */
   public static String generateHash(byte[] msg, SecretKey key) {
-    return new String(Base64.encodeBase64(generateByteHash(msg, key)));
+    return new String(Base64.encodeBase64(generateByteHash(msg, key)), Charsets.UTF_8);
   }
 
   /**
@@ -80,7 +79,7 @@ public class SecureShuffleUtils {
    */
   public static String hashFromString(String enc_str, JobTokenSecretManager mgr)
       throws IOException {
-    return new String(Base64.encodeBase64(mgr.computeHash(enc_str.getBytes())));
+    return new String(Base64.encodeBase64(mgr.computeHash(enc_str.getBytes(Charsets.UTF_8))), Charsets.UTF_8);
   }
   
   /**
@@ -91,9 +90,9 @@ public class SecureShuffleUtils {
    */
   public static void verifyReply(String base64Hash, String msg, JobTokenSecretManager mgr)
       throws IOException {
-    byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
+    byte[] hash = Base64.decodeBase64(base64Hash.getBytes(Charsets.UTF_8));
 
-    boolean res = verifyHash(hash, msg.getBytes(), mgr);
+    boolean res = verifyHash(hash, msg.getBytes(Charsets.UTF_8), mgr);
 
     if(res != true) {
       throw new IOException("Verification of the hashReply failed");
@@ -118,19 +117,4 @@ public class SecureShuffleUtils {
   private static String buildMsgFrom(String uri_path, String uri_query, int port) {
     return String.valueOf(port) + uri_path + "?" + uri_query;
   }
-  
-  
-  /**
-   * byte array to Hex String
-   * @param ba
-   * @return string with HEX value of the key
-   */
-  public static String toHex(byte[] ba) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    PrintStream ps = new PrintStream(baos);
-    for(byte b: ba) {
-      ps.printf("%x", b);
-    }
-    return baos.toString();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index e600f18..68951d5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -271,10 +271,8 @@ public class Fetcher implements Callable<FetchResult> {
             .suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
         if (!renamed) {
           localFs.delete(tmpIndex, false);
-          if (outputPath != null) {
-            // invariant: outputPath was renamed from tmpPath
-            localFs.delete(outputPath, false);
-          }
+          // invariant: outputPath was renamed from tmpPath
+          localFs.delete(outputPath, false);
           LOG.warn("Could not rename the index file to "
               + outputPath
                   .suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
@@ -505,7 +503,7 @@ public class Fetcher implements Callable<FetchResult> {
 
     if (isShutDown.get() && failedInputs != null && failedInputs.length > 0) {
       LOG.info("Fetcher already shutdown. Not reporting fetch failures for: " +
-          (failedInputs == null ? 0 : failedInputs.length) + " failed inputs");
+          failedInputs.length + " failed inputs");
       failedInputs = null;
     }
     return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs,

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index fb929e5..e01b985 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -48,7 +48,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem
 public class ShuffleUtils {
 
   private static final Log LOG = LogFactory.getLog(ShuffleUtils.class);
-  public static String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
+  public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
 
   public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 42013e3..13296c7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -688,7 +688,7 @@ public class ShuffleManager implements FetcherCallback {
    * Fake input that is added to the completed input list in case an input does not have any data.
    *
    */
-  private class NullFetchedInput extends FetchedInput {
+  private static class NullFetchedInput extends FetchedInput {
 
     public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
       super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
index 7ec56ec..75c552e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
@@ -83,12 +83,20 @@ public class InMemoryReader extends Reader {
     File dumpFile = new File("../output/" + taskAttemptId + ".dump");
     System.err.println("Dumping corrupt map-output of " + taskAttemptId +
                        " to " + dumpFile.getAbsolutePath());
+    FileOutputStream fos = null;
     try {
-      FileOutputStream fos = new FileOutputStream(dumpFile);
+      fos = new FileOutputStream(dumpFile);
       fos.write(buffer, 0, bufferSize);
-      fos.close();
     } catch (IOException ioe) {
       System.err.println("Failed to dump map-output of " + taskAttemptId);
+    } finally {
+      if (fos != null) {
+        try {
+          fos.close();
+        } catch (IOException e) {
+          System.err.println("Failed to dump map-output of " + taskAttemptId);
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index ad50bb5..aca7dc0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -696,7 +696,7 @@ public class MergeManager {
         final boolean preserve = fileChunk.isLocalFile();
         final Path file = fileChunk.getPath();
         approxOutputSize += size;
-        Segment segment = new Segment(conf, rfs, file, offset, size, codec, ifileReadAhead,
+        Segment segment = new Segment(rfs, file, offset, size, codec, ifileReadAhead,
             ifileReadAheadLength, ifileBufferSize, preserve);
         inputSegments.add(segment);
       }
@@ -924,7 +924,7 @@ public class MergeManager {
 
       final long fileOffset = fileChunk.getOffset();
       final boolean preserve = fileChunk.isLocalFile();
-      diskSegments.add(new Segment(job, fs, file, fileOffset, fileLength, codec, ifileReadAhead,
+      diskSegments.add(new Segment(fs, file, fileOffset, fileLength, codec, ifileReadAhead,
                                    ifileReadAheadLength, ifileBufferSize, preserve, counter));
     }
     LOG.info("Merging " + onDisk.length + " files, " +

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index a9701a6..138ecb9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.crypto.SecretKey;
 
@@ -95,7 +96,7 @@ public class Shuffle implements ExceptionReporter {
   private final int numFetchers;
   private final boolean localDiskFetchEnabled;
   
-  private Throwable throwable = null;
+  private AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
   private String throwingThreadName = null;
 
   private final RunShuffleCallable runShuffleCallable;
@@ -260,8 +261,8 @@ public class Shuffle implements ExceptionReporter {
     if (isShutDown.get()) {
       throw new InputAlreadyClosedException();
     }
-    if (throwable != null) {
-      handleThrowable(throwable);
+    if (throwable.get() != null) {
+      handleThrowable(throwable.get());
     }
     if (runShuffleFuture == null) {
       return false;
@@ -301,8 +302,8 @@ public class Shuffle implements ExceptionReporter {
     if (isShutDown.get()) {
       throw new InputAlreadyClosedException();
     }
-    if (throwable != null) {
-      handleThrowable(throwable);
+    if (throwable.get() != null) {
+      handleThrowable(throwable.get());
     }
     return kvIter;
   }
@@ -342,10 +343,10 @@ public class Shuffle implements ExceptionReporter {
 
       
       while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
-        synchronized (this) {
-          if (throwable != null) {
+        synchronized (Shuffle.this) {
+          if (throwable.get() != null) {
             throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                                   throwable);
+                                   throwable.get());
           }
         }
       }
@@ -368,9 +369,9 @@ public class Shuffle implements ExceptionReporter {
 
       // Sanity check
       synchronized (Shuffle.this) {
-        if (throwable != null) {
+        if (throwable.get() != null) {
           throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                                 throwable);
+                                 throwable.get());
         }
       }
 
@@ -450,8 +451,8 @@ public class Shuffle implements ExceptionReporter {
   @Private
   public synchronized void reportException(Throwable t) {
     // RunShuffleCallable onFailure deals with ignoring errors on shutdown.
-    if (throwable == null) {
-      throwable = t;
+    if (throwable.get() == null) {
+      throwable.set(t);
       throwingThreadName = Thread.currentThread().getName();
       // Notify the scheduler so that the reporting thread finds the 
       // exception immediately.

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 52e7334..066b94a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -472,7 +472,7 @@ class ShuffleScheduler {
       // This may be removed after TEZ-914
       InputAttemptIdentifier id = listItr.next();
       if (inputShouldBeConsumed(id)) {
-        Integer inputNumber = new Integer(id.getInputIdentifier().getInputIndex());
+        Integer inputNumber = Integer.valueOf(id.getInputIdentifier().getInputIndex());
         InputAttemptIdentifier oldId = dedupedList.get(inputNumber);
         if (oldId == null || oldId.getAttemptNumber() < id.getAttemptNumber()) {
           dedupedList.put(inputNumber, id);

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index bd31151..049bcef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -64,7 +64,7 @@ public class IFile {
   public static final int RLE_MARKER = -2; // Repeat same key marker
   public static final int V_END_MARKER = -3; // End of values marker
   public static final DataInputBuffer REPEAT_KEY = new DataInputBuffer();
-  public static final byte[] HEADER = new byte[] { (byte) 'T', (byte) 'I',
+  static final byte[] HEADER = new byte[] { (byte) 'T', (byte) 'I',
     (byte) 'F' , (byte) 0};
 
   private static final String INCOMPLETE_READ = "Requested to read %d got %d";
@@ -98,10 +98,9 @@ public class IFile {
 
     IFileOutputStream checksumOut;
 
-    Class keyClass;
-    Class valueClass;
-    Serializer keySerializer;
-    Serializer valueSerializer;
+    boolean closeSerializers = false;
+    Serializer keySerializer = null;
+    Serializer valueSerializer = null;
 
     final DataOutputBuffer buffer = new DataOutputBuffer();
     final DataOutputBuffer previous = new DataOutputBuffer();
@@ -165,16 +164,17 @@ public class IFile {
         this.out = new FSDataOutputStream(checksumOut,null);
       }
       writeHeader(outputStream);
-      this.keyClass = keyClass;
-      this.valueClass = valueClass;
 
       if (keyClass != null) {
+        this.closeSerializers = true;
         SerializationFactory serializationFactory =
           new SerializationFactory(conf);
         this.keySerializer = serializationFactory.getSerializer(keyClass);
         this.keySerializer.open(buffer);
         this.valueSerializer = serializationFactory.getSerializer(valueClass);
         this.valueSerializer.open(buffer);
+      } else {
+        this.closeSerializers = false;
       }
     }
 
@@ -197,7 +197,7 @@ public class IFile {
       // When IFile writer is created by BackupStore, we do not have
       // Key and Value classes set. So, check before closing the
       // serializers
-      if (keyClass != null) {
+      if (closeSerializers) {
         keySerializer.close();
         valueSerializer.close();
       }
@@ -506,7 +506,6 @@ public class IFile {
     byte keyBytes[] = new byte[0];
 
     long startPos;
-    protected boolean isCompressed = false;
 
     /**
      * Construct an IFile Reader.
@@ -565,7 +564,6 @@ public class IFile {
                   TezCounter readsCounter, TezCounter bytesReadCounter,
                   boolean readAhead, int readAheadLength,
                   int bufferSize, boolean isCompressed) throws IOException {
-      this.isCompressed = isCompressed;
       checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/*, isCompressed*/);
       if (isCompressed && codec != null) {
         decompressor = CodecPool.getDecompressor(codec);

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index abfe4ad..e346793 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -23,6 +23,7 @@ import java.io.FileDescriptor;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -275,7 +276,7 @@ public class IFileInputStream extends InputStream {
             ", len=" + len +
             ", length=" + length +
             ", checksumSize=" + checksumSize+
-            ", csum=" + csum +
+            ", csum=" + Arrays.toString(csum) +
             ", sum=" + sum; 
         LOG.info(mesg);
 
@@ -298,10 +299,6 @@ public class IFileInputStream extends InputStream {
     return result;
   }
 
-  public byte[] getChecksum() {
-    return csum;
-  }
-
   void disableChecksumValidation() {
     disableChecksumValidation = true;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index ae654b4..2fef83b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -111,7 +111,7 @@ public class PipelinedSorter extends ExternalSorter {
     LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb);
     // TODO: configurable setting?
     span = new SortSpan(largeBuffer, 1024*1024, 16);
-    merger = new SpanMerger(comparator);
+    merger = new SpanMerger();
     final int sortThreads = 
             this.conf.getInt(
                 TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 
@@ -345,7 +345,7 @@ public class PipelinedSorter extends ExternalSorter {
         TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
 
         Segment s =
-            new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
+            new Segment(rfs, spillFilename, indexRecord.getStartOffset(),
                              indexRecord.getPartLength(), codec, ifileReadAhead,
                              ifileReadAheadLength, ifileBufferSize, true);
         segmentList.add(i, s);
@@ -405,7 +405,7 @@ public class PipelinedSorter extends ExternalSorter {
     int getPartition();
   }
 
-  private class BufferStreamWrapper extends OutputStream 
+  private static class BufferStreamWrapper extends OutputStream
   {
     private final ByteBuffer out;
     public BufferStreamWrapper(ByteBuffer out) {
@@ -420,7 +420,7 @@ public class PipelinedSorter extends ExternalSorter {
     public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
   }
 
-  protected class InputByteBuffer extends DataInputBuffer {
+  protected static class InputByteBuffer extends DataInputBuffer {
     private byte[] buffer = new byte[256]; 
     private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
     private void resize(int length) {
@@ -607,7 +607,7 @@ public class PipelinedSorter extends ExternalSorter {
     }
   }
 
-  private class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
+  private static class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
     private int kvindex = -1;
     private int maxindex;
     private IntBuffer kvmeta;
@@ -617,7 +617,7 @@ public class PipelinedSorter extends ExternalSorter {
     private InputByteBuffer value = new InputByteBuffer();
     private Progress progress = new Progress();
 
-    private final int minrun = (1 << 4);
+    private static final int minrun = (1 << 4);
 
     public SpanIterator(SortSpan span) {
       this.kvmeta = span.kvmeta;
@@ -644,7 +644,7 @@ public class PipelinedSorter extends ExternalSorter {
       // caveat: since we use this as a comparable in the merger 
       if(kvindex == maxindex) return false;
       if(kvindex % 100 == 0) {
-          progress.set((kvindex-maxindex) / maxindex);
+          progress.set((kvindex-maxindex) / (float)maxindex);
       }
       kvindex += 1;
       return true;
@@ -741,7 +741,7 @@ public class PipelinedSorter extends ExternalSorter {
     }
   }
 
-  private class SortTask implements Callable<SpanIterator> {
+  private static class SortTask implements Callable<SpanIterator> {
     private final SortSpan sortable;
     private final IndexedSorter sorter;
     private final RawComparator comparator;
@@ -800,7 +800,7 @@ public class PipelinedSorter extends ExternalSorter {
     }
   }
 
-  private class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
+  private static class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
     public SpanHeap() {
       super(256);
     }
@@ -814,7 +814,6 @@ public class PipelinedSorter extends ExternalSorter {
   }
 
   private class SpanMerger implements PartitionedRawKeyValueIterator {
-    private final RawComparator comparator;
     InputByteBuffer key = new InputByteBuffer();
     InputByteBuffer value = new InputByteBuffer();
     int partition;
@@ -827,11 +826,9 @@ public class PipelinedSorter extends ExternalSorter {
     private int gallop = 0;
     private SpanIterator horse;
     private long total = 0;
-    private long count = 0;
     private long eq = 0;
     
-    public SpanMerger(RawComparator comparator) {
-      this.comparator = comparator;
+    public SpanMerger() {
       partIter = new PartitionFilter(this);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 3c8e66a..ed9a59d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -224,7 +224,6 @@ public class TezMerger {
     Reader reader = null;
     final DataInputBuffer key = new DataInputBuffer();
     
-    Configuration conf = null;
     FileSystem fs = null;
     Path file = null;
     boolean preserve = false; // Signifies whether the segment should be kept after a merge is complete. Checked in the close method.
@@ -237,38 +236,37 @@ public class TezMerger {
     
     TezCounter mapOutputsCounter = null;
 
-    public Segment(Configuration conf, FileSystem fs, Path file,
+    public Segment(FileSystem fs, Path file,
         CompressionCodec codec, boolean ifileReadAhead,
         int ifileReadAheadLength, int bufferSize, boolean preserve)
     throws IOException {
-      this(conf, fs, file, codec, ifileReadAhead, ifileReadAheadLength,
+      this(fs, file, codec, ifileReadAhead, ifileReadAheadLength,
           bufferSize, preserve, null);
     }
 
-    public Segment(Configuration conf, FileSystem fs, Path file,
+    public Segment(FileSystem fs, Path file,
                    CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLenth,
                    int bufferSize, boolean preserve, TezCounter mergedMapOutputsCounter)
   throws IOException {
-      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec,
+      this(fs, file, 0, fs.getFileStatus(file).getLen(), codec,
           ifileReadAhead, ifileReadAheadLenth, bufferSize, preserve,
           mergedMapOutputsCounter);
     }
 
-    public Segment(Configuration conf, FileSystem fs, Path file,
+    public Segment(FileSystem fs, Path file,
                    long segmentOffset, long segmentLength,
                    CompressionCodec codec, boolean ifileReadAhead,
                    int ifileReadAheadLength,  int bufferSize, 
                    boolean preserve) throws IOException {
-      this(conf, fs, file, segmentOffset, segmentLength, codec, ifileReadAhead,
+      this(fs, file, segmentOffset, segmentLength, codec, ifileReadAhead,
           ifileReadAheadLength, bufferSize, preserve, null);
     }
 
-    public Segment(Configuration conf, FileSystem fs, Path file,
+    public Segment(FileSystem fs, Path file,
         long segmentOffset, long segmentLength, CompressionCodec codec,
         boolean ifileReadAhead, int ifileReadAheadLength, int bufferSize,
         boolean preserve, TezCounter mergedMapOutputsCounter)
     throws IOException {
-      this.conf = conf;
       this.fs = fs;
       this.file = file;
       this.codec = codec;
@@ -440,7 +438,7 @@ public class TezMerger {
       
       for (Path file : inputs) {
         LOG.debug("MergeQ: adding: " + file);
-        segments.add(new Segment(conf, fs, file, codec, ifileReadAhead,
+        segments.add(new Segment(fs, file, codec, ifileReadAhead,
                                       ifileReadAheadLength, ifileBufferSize,
                                       !deleteInputs, 
                                        (file.toString().endsWith(
@@ -760,7 +758,7 @@ public class TezMerger {
 
           // Add the newly create segment to the list of segments to be merged
           Segment tempSegment = 
-            new Segment(conf, fs, outputFile, codec, ifileReadAhead,
+            new Segment(fs, outputFile, codec, ifileReadAhead,
                 ifileReadAheadLength, ifileBufferSize, false);
 
           // Insert new merged segment into the sorted list

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 773eaba..29ffba1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -648,8 +648,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     @Override
     public void run() {
       spillLock.lock();
-      spillThreadRunning = true;
       try {
+        spillThreadRunning = true;
         while (true) {
           spillDone.signal();
           while (!spillInProgress) {
@@ -720,7 +720,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         : kvmeta.capacity() + kvstart) / NMETA;
   }
 
-  private boolean isRLENeeded() {
+  private synchronized boolean isRLENeeded() {
     return (sameKey > (0.1 * totalKeys)) || (sameKey < 0);
   }
 
@@ -1009,10 +1009,10 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
       if (indexCacheList.size() == 0) {
         sameVolRename(mapOutputFile.getSpillIndexFile(0),
-          mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
+            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
       } else {
         indexCacheList.get(0).writeToFile(
-          mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
+            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
       }
       return;
     }
@@ -1072,7 +1072,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
 
           Segment s =
-            new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(),
+            new Segment(rfs, filename[i], indexRecord.getStartOffset(),
                              indexRecord.getPartLength(), codec, ifileReadAhead,
                              ifileReadAheadLength, ifileBufferSize, true);
           segmentList.add(i, s);

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 74095c6..2a1df40 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -413,8 +413,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   public List<Event> close() throws IOException, InterruptedException {
     isShutdown.set(true);
     spillLock.lock();
-    LOG.info("Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
     try {
+      LOG.info("Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
       while (pendingSpillCount.get() != 0 && spillException == null) {
         spillInProgress.await();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index f46f8f7..9031eb0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -243,6 +243,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
 
   @Override
   public void handleEvents(List<Event> inputEvents) throws IOException {
+    Shuffle shuffleLocalRef;
     synchronized (this) {
       if (getNumPhysicalInputs() == 0) {
         throw new RuntimeException("No input events expected as numInputs is 0");
@@ -254,8 +255,9 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
         pendingEvents.addAll(inputEvents);
         return;
       }
+      shuffleLocalRef = shuffle;
     }
-    shuffle.handleEvents(inputEvents);
+    shuffleLocalRef.handleEvents(inputEvents);
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 0d02cb3..90e8f0d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -183,6 +183,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
 
   @Override
   public void handleEvents(List<Event> inputEvents) throws IOException {
+    ShuffleEventHandler inputEventHandlerLocalRef;
     synchronized (this) {
       if (getNumPhysicalInputs() == 0) {
         throw new RuntimeException("No input events expected as numInputs is 0");
@@ -197,8 +198,9 @@ public class UnorderedKVInput extends AbstractLogicalInput {
         pendingEvents.addAll(inputEvents);
         return;
       }
+      inputEventHandlerLocalRef = inputEventHandler;
     }
-    inputEventHandler.handleEvents(inputEvents);
+    inputEventHandlerLocalRef.handleEvents(inputEvents);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/fda4c0bd/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index 540153a..1b9ea11 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -24,6 +24,7 @@ import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Charsets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -101,7 +102,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
    */
   public static class SleepProcessorConfig {
     private int timeToSleepMS;
-    private final Charset charSet = Charset.forName("UTF-8");
+    private final Charset charSet = Charsets.UTF_8;
 
     public SleepProcessorConfig() {
     }
@@ -114,7 +115,8 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
     }
 
     public UserPayload toUserPayload() {
-      return UserPayload.create(ByteBuffer.wrap(Integer.toString(timeToSleepMS).getBytes()));
+      return UserPayload.create(ByteBuffer.wrap(Integer.toString(timeToSleepMS).getBytes(
+          charSet)));
     }
 
     public void fromUserPayload(UserPayload userPayload) throws CharacterCodingException {