You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/21 19:13:29 UTC

git commit: TEZ-570. Change IFileInputStream to not read from Configuration on instance creation. (sseth)

Updated Branches:
  refs/heads/master 5f587818d -> 03fa6d04d


TEZ-570. Change IFileInputStream to not read from Configuration on
instance creation. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/03fa6d04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/03fa6d04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/03fa6d04

Branch: refs/heads/master
Commit: 03fa6d04d48b6b4ad06bae6a259887b042625a24
Parents: 5f58781
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Oct 21 10:13:17 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Oct 21 10:13:17 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |  9 ++-
 .../processor/map/TestMapProcessor.java         |  2 +-
 .../broadcast/input/BroadcastKVReader.java      | 29 ++++----
 .../BroadcastShuffleInputEventHandler.java      | 17 +++--
 .../input/BroadcastShuffleManager.java          | 28 +++++++-
 .../common/localshuffle/LocalShuffle.java       | 17 +++++
 .../library/common/shuffle/impl/Fetcher.java    | 29 ++++----
 .../common/shuffle/impl/InMemoryReader.java     |  2 +-
 .../common/shuffle/impl/MergeManager.java       | 34 ++++++---
 .../library/common/shuffle/impl/Shuffle.java    | 34 ++++++++-
 .../common/sort/impl/ExternalSorter.java        | 20 +++++-
 .../runtime/library/common/sort/impl/IFile.java | 21 +++---
 .../common/sort/impl/IFileInputStream.java      | 34 +++++----
 .../common/sort/impl/PipelinedSorter.java       | 11 ++-
 .../library/common/sort/impl/TezMerger.java     | 74 ++++++++++++++------
 .../common/sort/impl/dflt/DefaultSorter.java    |  7 +-
 .../library/input/ShuffledUnorderedKVInput.java |  3 +-
 .../runtime/library/shuffle/common/Fetcher.java | 52 ++++++++------
 .../library/shuffle/common/ShuffleUtils.java    | 11 ++-
 .../library/common/sort/impl/TestIFile.java     |  4 +-
 20 files changed, 303 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 88ea16d..005d465 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -44,17 +44,22 @@ public class TezJobConfig {
    */
   public static final String TEZ_RUNTIME_IFILE_READAHEAD =
       "tez.runtime.ifile.readahead";
-  public static final boolean DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD = true;
+  public static final boolean TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT = true;
 
   /**
    * Configuration key to set the IFile readahead length in bytes.
    */
   public static final String TEZ_RUNTIME_IFILE_READAHEAD_BYTES =
       "tez.runtime.ifile.readahead.bytes";
-  public static final int DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD_BYTES =
+  public static final int TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT =
       4 * 1024 * 1024;
 
   /**
+   * TODO Maybe move this over from IFile into this file. -1 for now means ignore.
+   */
+  public static final int TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT = -1;
+  
+  /**
    * 
    */
   public static final String RECORDS_BEFORE_PROGRESS = 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index d3204fe..78191a4 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -150,7 +150,7 @@ public class TestMapProcessor {
     Path mapOutputFile = mapOutputs.getInputFile(new InputAttemptIdentifier(0, 0));
     LOG.info("mapOutputFile = " + mapOutputFile);
     IFile.Reader reader =
-        new IFile.Reader(job, localFs, mapOutputFile, null, null);
+        new IFile.Reader(localFs, mapOutputFile, null, null, false, 0, -1);
     LongWritable key = new LongWritable();
     Text value = new Text();
     DataInputBuffer keyBuf = new DataInputBuffer();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index 070f902..0b12a53 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -25,10 +25,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
@@ -42,7 +40,6 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
   private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
   
   private final BroadcastShuffleManager shuffleManager;
-  private final Configuration conf;
   private final CompressionCodec codec;
   
   private final Class<K> keyClass;
@@ -52,6 +49,10 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
   private final DataInputBuffer keyIn;
   private final DataInputBuffer valIn;
 
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final int ifileBufferSize;
+  
   private K key;
   private V value;
   
@@ -60,18 +61,15 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
   
   private int numRecordsRead = 0;
   
-  public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
-      Configuration conf) throws IOException {
+  public BroadcastKVReader(BroadcastShuffleManager shuffleManager, Configuration conf,
+      CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize)
+      throws IOException {
     this.shuffleManager = shuffleManager;
-    this.conf = conf;
 
-    if (ConfigUtils.isIntermediateInputCompressed(this.conf)) {
-      Class<? extends CompressionCodec> codecClass = ConfigUtils
-          .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, conf);
-    } else {
-      codec = null;
-    }
+    this.codec = codec;
+    this.ifileReadAhead = ifileReadAhead;
+    this.ifileReadAheadLength = ifileReadAheadLength;
+    this.ifileBufferSize = ifileBufferSize;
 
     this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
     this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
@@ -182,8 +180,9 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
       return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
           mfi.getBytes(), 0, (int) mfi.getSize());
     } else {
-      return new IFile.Reader(conf, fetchedInput.getInputStream(),
-          fetchedInput.getSize(), codec, null);
+      return new IFile.Reader(fetchedInput.getInputStream(),
+          fetchedInput.getSize(), codec, null, ifileReadAhead,
+          ifileReadAheadLength, ifileBufferSize);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index 1c2092a..f9976d6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -24,7 +24,6 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
@@ -47,18 +46,22 @@ public class BroadcastShuffleInputEventHandler {
 
   private static final Log LOG = LogFactory.getLog(BroadcastShuffleInputEventHandler.class);
   
-  private final Configuration conf;
   private final BroadcastShuffleManager shuffleManager;
   private final FetchedInputAllocator inputAllocator;
   private final CompressionCodec codec;
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
   
-  public BroadcastShuffleInputEventHandler(TezInputContext inputContext, Configuration conf,
+  
+  public BroadcastShuffleInputEventHandler(TezInputContext inputContext,
       BroadcastShuffleManager shuffleManager,
-      FetchedInputAllocator inputAllocator, CompressionCodec codec) {
-    this.conf = conf;
+      FetchedInputAllocator inputAllocator, CompressionCodec codec,
+      boolean ifileReadAhead, int ifileReadAheadLength) {
     this.shuffleManager = shuffleManager;
     this.inputAllocator = inputAllocator;
     this.codec = codec;
+    this.ifileReadAhead = ifileReadAhead;
+    this.ifileReadAheadLength = ifileReadAheadLength;
   }
 
   public void handleEvents(List<Event> events) throws IOException {
@@ -117,9 +120,9 @@ public class BroadcastShuffleInputEventHandler {
           .getData().newInput(), dataProto.getCompressedLength(), LOG);
       break;
     case MEMORY:
-      ShuffleUtils.shuffleToMemory(conf, (MemoryFetchedInput) fetchedInput,
+      ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput,
           dataProto.getData().newInput(), dataProto.getRawLength(),
-          dataProto.getCompressedLength(), codec, LOG);
+          dataProto.getCompressedLength(), codec, ifileReadAhead, ifileReadAheadLength, LOG);
       break;
     case WAIT:
     default:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 09652d4..717f8ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -113,6 +113,10 @@ public class BroadcastShuffleManager implements FetcherCallback {
   private final int readTimeout;
   private final CompressionCodec codec;
   
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final int ifileBufferSize;
+  
   private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
   
   private volatile Throwable shuffleError;
@@ -132,8 +136,23 @@ public class BroadcastShuffleManager implements FetcherCallback {
       codec = null;
     }
 
+    this.ifileReadAhead = conf.getBoolean(
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+    if (this.ifileReadAhead) {
+      this.ifileReadAheadLength = conf.getInt(
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+    } else {
+      this.ifileReadAheadLength = 0;
+    }
+    this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+        TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+    
     this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf);
-    this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this.conf, this, this.inputManager, codec);
+    this.inputEventHandler = new BroadcastShuffleInputEventHandler(
+        inputContext, this, this.inputManager, codec, ifileReadAhead,
+        ifileReadAheadLength);
 
     completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
     completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
@@ -269,6 +288,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
     if (codec != null) {
       fetcherBuilder.setCompressionParameters(codec);
     }
+    fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);
 
     // Remove obsolete inputs from the list being given to the fetcher. Also
     // remove from the obsolete list.
@@ -531,7 +551,11 @@ public class BroadcastShuffleManager implements FetcherCallback {
     return input;
   }
   /////////////////// End of methods for walking the available inputs
-  
+
+  @SuppressWarnings("rawtypes")
+  public BroadcastKVReader craeteReader() throws IOException {
+    return new BroadcastKVReader(this, conf, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize);
+  }
   
   /**
    * Fake input that is added to the completed input list in case an input does not have any data.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
index b40df6f..36723b0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
@@ -58,6 +58,10 @@ public class LocalShuffle {
   private final TezCounter spilledRecordsCounter;
   private final CompressionCodec codec;
   private final TezTaskOutput mapOutputFile;
+  
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final int ifileBufferSize;
 
   public LocalShuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
     this.inputContext = inputContext;
@@ -85,6 +89,18 @@ public class LocalShuffle {
     } else {
       this.codec = null;
     }
+    this.ifileReadAhead = conf.getBoolean(
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+    if (this.ifileReadAhead) {
+      this.ifileReadAheadLength = conf.getInt(
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+    } else {
+      this.ifileReadAheadLength = 0;
+    }
+    this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+        TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
     
     // Always local
     this.mapOutputFile = new TezLocalTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
@@ -99,6 +115,7 @@ public class LocalShuffle {
     return TezMerger.merge(conf, rfs, 
         keyClass, valClass,
         codec, 
+        ifileReadAhead, ifileReadAheadLength, ifileBufferSize,
         getMapFiles(),
         false, 
         sortFactor,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 7741122..fd88973 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -41,13 +41,10 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput.Type;
@@ -90,6 +87,9 @@ class Fetcher extends Thread {
   private volatile boolean stopped = false;
 
   private Configuration job;
+  
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
 
   private static boolean sslShuffle;
   private static SSLFactory sslFactory;
@@ -97,7 +97,7 @@ class Fetcher extends Thread {
   public Fetcher(Configuration job, 
       ShuffleScheduler scheduler, MergeManager merger,
       ShuffleClientMetrics metrics,
-      Shuffle shuffle, SecretKey jobTokenSecret, TezInputContext inputContext) throws IOException {
+      Shuffle shuffle, SecretKey jobTokenSecret, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, TezInputContext inputContext) throws IOException {
     this.job = job;
     this.scheduler = scheduler;
     this.merger = merger;
@@ -118,14 +118,15 @@ class Fetcher extends Thread {
     wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.WRONG_REDUCE.toString());
 
-    if (ConfigUtils.isIntermediateInputCompressed(job)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateInputCompressorClass(job, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, job);
-      decompressor = CodecPool.getDecompressor(codec);
+    this.ifileReadAhead = ifileReadAhead;
+    this.ifileReadAheadLength = ifileReadAheadLength;
+    
+    if (codec != null) {
+      this.codec = codec;
+      this.decompressor = CodecPool.getDecompressor(codec);
     } else {
-      codec = null;
-      decompressor = null;
+      this.codec = null;
+      this.decompressor = null;
     }
 
     this.connectionTimeout = 
@@ -151,8 +152,8 @@ class Fetcher extends Thread {
         }
       }
     }
-  }
-  
+  }  
+
   public void run() {
     try {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
@@ -547,7 +548,7 @@ class Fetcher extends Thread {
                                int decompressedLength, 
                                int compressedLength) throws IOException {    
     IFileInputStream checksumIn = 
-      new IFileInputStream(input, compressedLength, job);
+      new IFileInputStream(input, compressedLength, ifileReadAhead, ifileReadAheadLength);
 
     input = checksumIn;       
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
index ae95268..479d704 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
@@ -45,7 +45,7 @@ public class InMemoryReader extends Reader {
   public InMemoryReader(MergeManager merger, InputAttemptIdentifier taskAttemptId,
                         byte[] data, int start, int length)
   throws IOException {
-    super(null, null, length - start, null, null);
+    super(null, length - start, null, null, false, 0, -1);
     this.merger = merger;
     this.taskAttemptId = taskAttemptId;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 0abe530..27ebd5d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -108,6 +108,10 @@ public class MergeManager {
   private final CompressionCodec codec;
   
   private volatile boolean finalMergeComplete = false;
+  
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final int ifileBufferSize;
 
   public MergeManager(Configuration conf, 
                       FileSystem localFS,
@@ -140,6 +144,18 @@ public class MergeManager {
     } else {
       codec = null;
     }
+    this.ifileReadAhead = conf.getBoolean(
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+    if (this.ifileReadAhead) {
+      this.ifileReadAheadLength = conf.getInt(
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+    } else {
+      this.ifileReadAheadLength = 0;
+    }
+    this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+        TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
 
     final float maxInMemCopyUse =
       conf.getFloat(
@@ -408,7 +424,7 @@ public class MergeManager {
                        new Path(inputContext.getUniqueIdentifier()),
                        (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
                        nullProgressable, null, null, null);
-      TezMerger.writeFile(rIter, writer, nullProgressable, conf);
+      TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
       writer.close();
 
       LOG.info(inputContext.getUniqueIdentifier() +  
@@ -476,7 +492,7 @@ public class MergeManager {
             nullProgressable, spilledRecordsCounter, null, null);
 
         if (null == combiner) {
-          TezMerger.writeFile(rIter, writer, nullProgressable, conf);
+          TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
         } else {
           runCombineProcessor(rIter, writer);
         }
@@ -552,13 +568,13 @@ public class MergeManager {
         iter = TezMerger.merge(conf, rfs,
                             (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
                             (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                            codec, inputs.toArray(new Path[inputs.size()]), 
-                            true, ioSortFactor, tmpDir, 
+                            codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize,
+                            inputs.toArray(new Path[inputs.size()]), true, ioSortFactor, tmpDir, 
                             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), 
                             nullProgressable, spilledRecordsCounter, null, 
                             mergedMapOutputsCounter, null);
 
-        TezMerger.writeFile(iter, writer, nullProgressable, conf);
+        TezMerger.writeFile(iter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
         writer.close();
       } catch (IOException e) {
         localFS.delete(outputPath, true);
@@ -609,7 +625,8 @@ public class MergeManager {
 
     public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size)
         throws IOException {
-      super(null, null, size, null, spilledRecordsCounter);
+      super(null, size, null, spilledRecordsCounter, ifileReadAhead,
+          ifileReadAheadLength, ifileBufferSize);
       this.kvIter = kvIter;
     }
     public boolean nextRawKey(DataInputBuffer key) throws IOException {
@@ -698,7 +715,7 @@ public class MergeManager {
         final Writer writer = new Writer(job, fs, outputPath,
             keyClass, valueClass, codec, null);
         try {
-          TezMerger.writeFile(rIter, writer, nullProgressable, job);
+          TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
           // add to list of final disk outputs.
           onDiskMapOutputs.add(outputPath);
         } catch (IOException e) {
@@ -735,7 +752,8 @@ public class MergeManager {
       onDiskBytes += fs.getFileStatus(file).getLen();
       LOG.debug("Disk file: " + file + " Length is " + 
           fs.getFileStatus(file).getLen());
-      diskSegments.add(new Segment(job, fs, file, codec, false,
+      diskSegments.add(new Segment(job, fs, file, codec, ifileReadAhead,
+                                   ifileReadAheadLength, ifileBufferSize, false,
                                          (file.toString().endsWith(
                                              Constants.MERGED_OUTPUT_PREFIX) ?
                                           null : mergedMapOutputsCounter)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index acf987a..766ffea 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -32,13 +32,17 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
@@ -65,6 +69,9 @@ public class Shuffle implements ExceptionReporter {
   private String throwingThreadName = null;
   private final int numInputs;
   private final SecretKey jobTokenSecret;
+  private final CompressionCodec codec;
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
 
   private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
 
@@ -81,6 +88,24 @@ public class Shuffle implements ExceptionReporter {
         .getJobTokenSecretFromTokenBytes(inputContext
             .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
     
+    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, conf);
+    } else {
+      codec = null;
+    }
+    this.ifileReadAhead = conf.getBoolean(
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+    if (this.ifileReadAhead) {
+      this.ifileReadAheadLength = conf.getInt(
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+    } else {
+      this.ifileReadAheadLength = 0;
+    }
+    
     Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);
     
     FileSystem localFS = FileSystem.getLocal(this.conf);
@@ -101,7 +126,9 @@ public class Shuffle implements ExceptionReporter {
     TezCounter mergedMapOutputsCounter =
         inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
     
-    LOG.info("Shuffle assigned with " + numInputs + " inputs");
+    LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
+        + (codec == null ? "None" : codec.getClass().getName()) + 
+        "ifileReadAhead: " + ifileReadAhead);
 
     scheduler = new ShuffleScheduler(
           this.inputContext,
@@ -184,7 +211,10 @@ public class Shuffle implements ExceptionReporter {
               TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
       Fetcher[] fetchers = new Fetcher[numFetchers];
       for (int i = 0; i < numFetchers; ++i) {
-        fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, jobTokenSecret, inputContext);
+        fetchers[i] = new Fetcher(conf, scheduler, merger, metrics,
+            Shuffle.this, jobTokenSecret, ifileReadAhead, ifileReadAheadLength,
+            codec, inputContext);
+        
         fetchers[i].start();
       }
       

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index c362d98..e7519c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -62,7 +62,7 @@ public abstract class ExternalSorter {
   public abstract void flush() throws IOException;
 
   public abstract void write(Object key, Object value) throws IOException;
-
+  
   protected Progressable nullProgressable = new NullProgressable();
   protected TezOutputContext outputContext;
   protected Combiner combiner;
@@ -77,6 +77,10 @@ public abstract class ExternalSorter {
   protected SerializationFactory serializationFactory;
   protected Serializer keySerializer;
   protected Serializer valSerializer;
+  
+  protected boolean ifileReadAhead;
+  protected int ifileReadAheadLength;
+  protected int ifileBufferSize;
 
   protected IndexedSorter sorter;
 
@@ -129,6 +133,20 @@ public abstract class ExternalSorter {
       codec = null;
     }
 
+    this.ifileReadAhead = this.conf.getBoolean(
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+    if (this.ifileReadAhead) {
+      this.ifileReadAheadLength = conf.getInt(
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+    } else {
+      this.ifileReadAheadLength = 0;
+    }
+    this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+        TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+
+    
     // Task outputs
     mapOutputFile = TezRuntimeUtils.instantiateTaskOutputManager(conf, outputContext);
     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 18583a5..c9cbc55 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -378,12 +378,13 @@ public class IFile {
      * @param readsCounter Counter for records read from disk
      * @throws IOException
      */
-    public Reader(Configuration conf, FileSystem fs, Path file,
+    public Reader(FileSystem fs, Path file,
                   CompressionCodec codec,
-                  TezCounter readsCounter) throws IOException {
-      this(conf, fs.open(file), 
+                  TezCounter readsCounter, boolean ifileReadAhead,
+                  int ifileReadAheadLength, int bufferSize) throws IOException {
+      this(fs.open(file), 
            fs.getFileStatus(file).getLen(),
-           codec, readsCounter);
+           codec, readsCounter, ifileReadAhead, ifileReadAheadLength, bufferSize);
     }
 
     /**
@@ -397,11 +398,13 @@ public class IFile {
      * @param readsCounter Counter for records read from disk
      * @throws IOException
      */
-    public Reader(Configuration conf, InputStream in, long length, 
+    public Reader(InputStream in, long length, 
                   CompressionCodec codec,
-                  TezCounter readsCounter) throws IOException {
+                  TezCounter readsCounter,
+                  boolean readAhead, int readAheadLength,
+                  int bufferSize) throws IOException {
       readRecordsCounter = readsCounter;
-      checksumIn = new IFileInputStream(in,length, conf);
+      checksumIn = new IFileInputStream(in,length, readAhead, readAheadLength);
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
         if (decompressor != null) {
@@ -416,8 +419,8 @@ public class IFile {
       this.dataIn = new DataInputStream(this.in);
       this.fileLength = length;
       
-      if (conf != null) {
-        bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+      if (bufferSize != -1) {
+        this.bufferSize = bufferSize;
       }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index e828c0b..69ff394 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -28,14 +28,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.HasFileDescriptor;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.common.TezJobConfig;
 /**
  * A checksum input stream, used for IFiles.
  * Used to validate the checksum of files created by {@link IFileOutputStream}. 
@@ -58,21 +56,31 @@ public class IFileInputStream extends InputStream {
 
   private ReadaheadRequest curReadahead = null;
   private ReadaheadPool raPool = ReadaheadPool.getInstance();
-  private boolean readahead;
-  private int readaheadLength;
+  private final boolean readahead;
+  private final int readaheadLength;
 
   public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
 
   private boolean disableChecksumValidation = false;
   
   /**
+   * Create a checksum input stream that reads without readAhead.
+   * @param in
+   * @param len
+   */
+  public IFileInputStream(InputStream in, long len) {
+    this(in, len, false, 0);
+  }
+  
+  /**
    * Create a checksum input stream that reads
    * @param in The input stream to be verified for checksum.
    * @param len The length of the input stream including checksum bytes.
+   * @param readAhead Whether to attempt readAhead for this stream
+   * @param readAheadLength Number of bytes to readAhead if it is enabled
    */
-  public IFileInputStream(InputStream in, long len, Configuration conf) {
+  public IFileInputStream(InputStream in, long len, boolean readAhead, int readAheadLength) {
     this.in = in;
-    this.inFd = getFileDescriptorIfAvail(in);
     sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 
         Integer.MAX_VALUE);
     checksumSize = sum.getChecksumSize();
@@ -81,13 +89,15 @@ public class IFileInputStream extends InputStream {
     length = len;
     dataLength = length - checksumSize;
 
-    conf = (conf != null) ? conf : new Configuration();
-    readahead = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD);
-    readaheadLength = conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+    readahead = readAhead;
+    readaheadLength = readAheadLength;
 
-    doReadahead();
+    if (readahead) {
+      this.inFd = getFileDescriptorIfAvail(in);
+      doReadahead();
+    } else {
+      this.inFd = null;
+    }
   }
 
   private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 1b153ca..1c6ee5d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -60,7 +60,11 @@ public class PipelinedSorter extends ExternalSorter {
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
 
   private final static int APPROX_HEADER_LENGTH = 150;
-    
+  
+  boolean ifileReadAhead;
+  int ifileReadAheadLength;
+  int ifileBufferSize;
+  
   int partitionBits;
   
   private static final int PARTITION = 0;        // partition offset in acct
@@ -354,7 +358,8 @@ public class PipelinedSorter extends ExternalSorter {
 
         Segment s =
             new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
-                             indexRecord.getPartLength(), codec, true);
+                             indexRecord.getPartLength(), codec, ifileReadAhead,
+                             ifileReadAheadLength, ifileBufferSize, true);
         segmentList.add(i, s);
       }
 
@@ -380,7 +385,7 @@ public class PipelinedSorter extends ExternalSorter {
                            spilledRecordsCounter);
       writer.setRLE(merger.needsRLE());
       if (combiner == null || numSpills < minSpillsForCombine) {
-        TezMerger.writeFile(kvIter, writer, nullProgressable, conf);
+        TezMerger.writeFile(kvIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
       } else {
         runCombineProcessor(kvIter, writer);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index bb4b4a2..6e1d7ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -63,7 +63,8 @@ public class TezMerger {
   public static
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
                             Class keyClass, Class valueClass, 
-                            CompressionCodec codec,
+                            CompressionCodec codec, boolean ifileReadAhead,
+                            int ifileReadAheadLength, int ifileBufferSize,
                             Path[] inputs, boolean deleteInputs, 
                             int mergeFactor, Path tmpDir,
                             RawComparator comparator, Progressable reporter,
@@ -72,7 +73,8 @@ public class TezMerger {
                             Progress mergePhase)
   throws IOException {
     return 
-      new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator, 
+      new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
+                           ifileReadAheadLength, ifileBufferSize, comparator, 
                            reporter, null).merge(keyClass, valueClass,
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter, 
@@ -82,7 +84,8 @@ public class TezMerger {
   public static 
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
                             Class keyClass, Class valueClass, 
-                            CompressionCodec codec,
+                            CompressionCodec codec, boolean ifileReadAhead,
+                            int ifileReadAheadLength, int ifileBufferSize,
                             Path[] inputs, boolean deleteInputs, 
                             int mergeFactor, Path tmpDir,
                             RawComparator comparator,
@@ -93,7 +96,8 @@ public class TezMerger {
                             Progress mergePhase)
   throws IOException {
     return 
-      new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator, 
+      new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
+                           ifileReadAheadLength, ifileBufferSize, comparator, 
                            reporter, mergedMapOutputsCounter).merge(
                                            keyClass, valueClass,
                                            mergeFactor, tmpDir,
@@ -195,16 +199,13 @@ public class TezMerger {
 
   public static <K extends Object, V extends Object>
   void writeFile(TezRawKeyValueIterator records, Writer writer, 
-                 Progressable progressable, Configuration conf) 
+                 Progressable progressable, long recordsBeforeProgress) 
   throws IOException {
-    long progressBar = 
-        conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS, 
-            TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
     long recordCtr = 0;
     while(records.next()) {
       writer.append(records.getKey(), records.getValue());
       
-      if (((recordCtr++) % progressBar) == 0) {
+      if (((recordCtr++) % recordsBeforeProgress) == 0) {
         progressable.progress();
       }
     }
@@ -223,32 +224,41 @@ public class TezMerger {
     CompressionCodec codec = null;
     long segmentOffset = 0;
     long segmentLength = -1;
+    boolean ifileReadAhead;
+    int ifileReadAheadLength;
+    int bufferSize = -1;
     
     TezCounter mapOutputsCounter = null;
 
     public Segment(Configuration conf, FileSystem fs, Path file,
-                   CompressionCodec codec, boolean preserve)
+        CompressionCodec codec, boolean ifileReadAhead,
+        int ifileReadAheadLength, int bufferSize, boolean preserve)
     throws IOException {
-      this(conf, fs, file, codec, preserve, null);
+      this(conf, fs, file, codec, ifileReadAhead, ifileReadAheadLength,
+          bufferSize, preserve, null);
     }
 
     public Segment(Configuration conf, FileSystem fs, Path file,
-                   CompressionCodec codec, boolean preserve,
-                   TezCounter mergedMapOutputsCounter)
+                   CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLenth,
+                   int bufferSize, boolean preserve, TezCounter mergedMapOutputsCounter)
   throws IOException {
-      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, 
-           mergedMapOutputsCounter);
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec,
+          ifileReadAhead, ifileReadAheadLenth, bufferSize, preserve,
+          mergedMapOutputsCounter);
     }
 
     public Segment(Configuration conf, FileSystem fs, Path file,
                    long segmentOffset, long segmentLength,
-                   CompressionCodec codec,
+                   CompressionCodec codec, boolean ifileReadAhead,
+                   int ifileReadAheadLength,  int bufferSize, 
                    boolean preserve) throws IOException {
-      this(conf, fs, file, segmentOffset, segmentLength, codec, preserve, null);
+      this(conf, fs, file, segmentOffset, segmentLength, codec, ifileReadAhead,
+          ifileReadAheadLength, bufferSize, preserve, null);
     }
 
     public Segment(Configuration conf, FileSystem fs, Path file,
         long segmentOffset, long segmentLength, CompressionCodec codec,
+        boolean ifileReadAhead, int ifileReadAheadLength, int bufferSize,
         boolean preserve, TezCounter mergedMapOutputsCounter)
     throws IOException {
       this.conf = conf;
@@ -256,6 +266,9 @@ public class TezMerger {
       this.file = file;
       this.codec = codec;
       this.preserve = preserve;
+      this.ifileReadAhead = ifileReadAhead;
+      this.ifileReadAheadLength =ifileReadAheadLength;
+      this.bufferSize = bufferSize;
 
       this.segmentOffset = segmentOffset;
       this.segmentLength = segmentLength;
@@ -281,7 +294,8 @@ public class TezMerger {
       if (reader == null) {
         FSDataInputStream in = fs.open(file);
         in.seek(segmentOffset);
-        reader = new Reader(conf, in, segmentLength, codec, readsCounter);
+        reader = new Reader(in, segmentLength, codec, readsCounter,
+            ifileReadAhead, ifileReadAheadLength, bufferSize);
       }
       
       if (mapOutputsCounter != null) {
@@ -372,6 +386,10 @@ public class TezMerger {
     Configuration conf;
     FileSystem fs;
     CompressionCodec codec;
+    boolean ifileReadAhead = TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
+    int ifileReadAheadLength = TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
+    int ifileBufferSize = TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT;
+    long recordsBeforeProgress = TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS;
     
     List<Segment> segments = new ArrayList<Segment>();
     
@@ -401,11 +419,15 @@ public class TezMerger {
 
     public MergeQueue(Configuration conf, FileSystem fs, 
                       Path[] inputs, boolean deleteInputs, 
-                      CompressionCodec codec, RawComparator comparator,
-                      Progressable reporter, 
+                      CompressionCodec codec, boolean ifileReadAhead,
+                      int ifileReadAheadLength, int ifileBufferSize,
+                      RawComparator comparator, Progressable reporter, 
                       TezCounter mergedMapOutputsCounter) 
     throws IOException {
       this.conf = conf;
+      // this.recordsBeforeProgress =
+      // conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS,
+      // TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
       this.fs = fs;
       this.codec = codec;
       this.comparator = comparator;
@@ -413,7 +435,9 @@ public class TezMerger {
       
       for (Path file : inputs) {
         LOG.debug("MergeQ: adding: " + file);
-        segments.add(new Segment(conf, fs, file, codec, !deleteInputs, 
+        segments.add(new Segment(conf, fs, file, codec, ifileReadAhead,
+                                      ifileReadAheadLength, ifileBufferSize,
+                                      !deleteInputs, 
                                        (file.toString().endsWith(
                                            Constants.MERGED_OUTPUT_PREFIX) ? 
                                         null : mergedMapOutputsCounter)));
@@ -427,6 +451,9 @@ public class TezMerger {
         List<Segment> segments, RawComparator comparator,
         Progressable reporter, boolean sortSegments) {
       this.conf = conf;
+      // this.recordsBeforeProgress =
+      // conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS,
+      // TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
       this.fs = fs;
       this.comparator = comparator;
       this.segments = segments;
@@ -664,7 +691,7 @@ public class TezMerger {
           Writer writer = 
             new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
                              writesCounter);
-          writeFile(this, writer, reporter, conf);
+          writeFile(this, writer, reporter, recordsBeforeProgress);
           writer.close();
           
           //we finished one single level merge; now clean up the priority 
@@ -673,7 +700,8 @@ public class TezMerger {
 
           // Add the newly create segment to the list of segments to be merged
           Segment tempSegment = 
-            new Segment(conf, fs, outputFile, codec, false);
+            new Segment(conf, fs, outputFile, codec, ifileReadAhead,
+                ifileReadAheadLength, ifileBufferSize, false);
 
           // Insert new merged segment into the sorted list
           int pos = Collections.binarySearch(segments, tempSegment,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 1ff486f..778fd3d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -113,7 +113,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   @Override
   public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException { 
     super.initialize(outputContext, conf, numOutputs);
-
+    
     // sanity checks
     final float spillper = this.conf.getFloat(
         TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
@@ -1051,7 +1051,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
           Segment s =
             new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(),
-                             indexRecord.getPartLength(), codec, true);
+                             indexRecord.getPartLength(), codec, ifileReadAhead,
+                             ifileReadAheadLength, ifileBufferSize, true);
           segmentList.add(i, s);
 
           if (LOG.isDebugEnabled()) {
@@ -1084,7 +1085,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
                 spilledRecordsCounter);
         if (combiner == null || numSpills < minSpillsForCombine) {
           TezMerger.writeFile(kvIter, writer,
-              nullProgressable, conf);
+              nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
         } else {
           runCombineProcessor(kvIter, writer);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 4f94210..c6ba6b1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -50,7 +50,6 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   public ShuffledUnorderedKVInput() {
   }
 
-  @SuppressWarnings("rawtypes")
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws Exception {
     Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
@@ -59,7 +58,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
     
     this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
     this.shuffleManager.run();
-    this.kvReader = new BroadcastKVReader(shuffleManager, conf);
+    this.kvReader = this.shuffleManager.craeteReader();
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index d9bc101..96f1caf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -69,14 +69,16 @@ public class Fetcher implements Callable<FetchResult> {
   private int connectionTimeout;
   private int readTimeout;
 
+  private boolean ifileReadAhead = TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
+  private int ifileReadAheadLength = TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
+  
   private final SecretKey shuffleSecret;
-  private final Configuration conf;
 
   private final FetcherCallback fetcherCallback;
   private final FetchedInputAllocator inputManager;
   private final ApplicationId appId;
 
-  private static boolean sslShuffle;
+  private static boolean sslShuffle = false;
   private static SSLFactory sslFactory;
   private static boolean sslFactoryInited;
 
@@ -103,29 +105,29 @@ public class Fetcher implements Callable<FetchResult> {
     this.inputManager = inputManager;
     this.shuffleSecret = shuffleSecret;
     this.appId = appId;
-    this.conf = conf;
     this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
 
     this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
     
     // TODO NEWTEZ Ideally, move this out from here into a static initializer block.
-    synchronized (Fetcher.class) {
-      if (!sslFactoryInited) {
-        sslFactoryInited = true;
-        sslShuffle = conf.getBoolean(
-            TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
-            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
-        if (sslShuffle) {
-          sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
-          try {
-            sslFactory.init();
-          } catch (Exception ex) {
-            sslFactory.destroy();
-            throw new RuntimeException(ex);
-          }
-        }
-      }
-    }
+    // Re-enable when ssl shuffle support is needed.
+//    synchronized (Fetcher.class) {
+//      if (!sslFactoryInited) {
+//        sslFactoryInited = true;
+//        sslShuffle = conf.getBoolean(
+//            TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+//            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+//        if (sslShuffle) {
+//          sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+//          try {
+//            sslFactory.init();
+//          } catch (Exception ex) {
+//            sslFactory.destroy();
+//            throw new RuntimeException(ex);
+//          }
+//        }
+//      }
+//    }
   }
 
   @Override
@@ -261,9 +263,9 @@ public class Fetcher implements Callable<FetchResult> {
           + fetchedInput.getType());
 
       if (fetchedInput.getType() == Type.MEMORY) {
-        ShuffleUtils.shuffleToMemory(conf, (MemoryFetchedInput) fetchedInput,
+        ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput,
             input, (int) decompressedLength, (int) compressedLength, codec,
-            LOG);
+            ifileReadAhead, ifileReadAheadLength, LOG);
       } else {
         ShuffleUtils.shuffleToDisk((DiskFetchedInput) fetchedInput, input,
             compressedLength, LOG);
@@ -499,6 +501,12 @@ public class Fetcher implements Callable<FetchResult> {
       fetcher.readTimeout = readTimeout;
       return this;
     }
+    
+    public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
+      fetcher.ifileReadAhead = readAhead;
+      fetcher.ifileReadAheadLength = readAheadBytes;
+      return this;
+    }
 
     public FetcherBuilder assignWork(String host, int port, int partition,
         List<InputAttemptIdentifier> inputs) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
index 25718e1..094f0fe 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
 import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
@@ -74,12 +73,12 @@ public class ShuffleUtils {
   }
   
   @SuppressWarnings("resource")
-  public static void shuffleToMemory(Configuration conf,
-      MemoryFetchedInput fetchedInput, InputStream input,
-      int decompressedLength, int compressedLength,
-      CompressionCodec codec, Log LOG) throws IOException {
+  public static void shuffleToMemory(MemoryFetchedInput fetchedInput,
+      InputStream input, int decompressedLength, int compressedLength,
+      CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength,
+      Log LOG) throws IOException {
     IFileInputStream checksumIn = new IFileInputStream(input, compressedLength,
-        conf);
+        ifileReadAhead, ifileReadAheadLength);
 
     input = checksumIn;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 019fd0e..17b7174 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -96,7 +96,7 @@ public class TestIFile {
     List<KVPair> data = KVDataGen.generateTestData(true);
     writeTestFile(outputPath, false, data);
 
-    IFile.Reader reader = new IFile.Reader(defaultConf, localFs, outputPath, null, null);
+    IFile.Reader reader = new IFile.Reader(localFs, outputPath, null, null, false, 0, -1);
 
     readAndVerify(reader, data);
     reader.close();
@@ -129,7 +129,7 @@ public class TestIFile {
     List<KVPair> data = KVDataGen.generateTestData(true);
     writeTestFile(outputPath, true, data);
 
-    IFile.Reader reader = new IFile.Reader(defaultConf, localFs, outputPath, null, null);
+    IFile.Reader reader = new IFile.Reader(localFs, outputPath, null, null, false, 0, -1);
 
     readAndVerify(reader, data);
     reader.close();