You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/21 19:13:29 UTC
git commit: TEZ-570. Change IFileInputStream to not read from
Configuration on instance creation. (sseth)
Updated Branches:
refs/heads/master 5f587818d -> 03fa6d04d
TEZ-570. Change IFileInputStream to not read from Configuration on
instance creation. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/03fa6d04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/03fa6d04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/03fa6d04
Branch: refs/heads/master
Commit: 03fa6d04d48b6b4ad06bae6a259887b042625a24
Parents: 5f58781
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Oct 21 10:13:17 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Oct 21 10:13:17 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/TezJobConfig.java | 9 ++-
.../processor/map/TestMapProcessor.java | 2 +-
.../broadcast/input/BroadcastKVReader.java | 29 ++++----
.../BroadcastShuffleInputEventHandler.java | 17 +++--
.../input/BroadcastShuffleManager.java | 28 +++++++-
.../common/localshuffle/LocalShuffle.java | 17 +++++
.../library/common/shuffle/impl/Fetcher.java | 29 ++++----
.../common/shuffle/impl/InMemoryReader.java | 2 +-
.../common/shuffle/impl/MergeManager.java | 34 ++++++---
.../library/common/shuffle/impl/Shuffle.java | 34 ++++++++-
.../common/sort/impl/ExternalSorter.java | 20 +++++-
.../runtime/library/common/sort/impl/IFile.java | 21 +++---
.../common/sort/impl/IFileInputStream.java | 34 +++++----
.../common/sort/impl/PipelinedSorter.java | 11 ++-
.../library/common/sort/impl/TezMerger.java | 74 ++++++++++++++------
.../common/sort/impl/dflt/DefaultSorter.java | 7 +-
.../library/input/ShuffledUnorderedKVInput.java | 3 +-
.../runtime/library/shuffle/common/Fetcher.java | 52 ++++++++------
.../library/shuffle/common/ShuffleUtils.java | 11 ++-
.../library/common/sort/impl/TestIFile.java | 4 +-
20 files changed, 303 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 88ea16d..005d465 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -44,17 +44,22 @@ public class TezJobConfig {
*/
public static final String TEZ_RUNTIME_IFILE_READAHEAD =
"tez.runtime.ifile.readahead";
- public static final boolean DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD = true;
+ public static final boolean TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT = true;
/**
* Configuration key to set the IFile readahead length in bytes.
*/
public static final String TEZ_RUNTIME_IFILE_READAHEAD_BYTES =
"tez.runtime.ifile.readahead.bytes";
- public static final int DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD_BYTES =
+ public static final int TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT =
4 * 1024 * 1024;
/**
+ * TODO Maybe move this over from IFile into this file. -1 for now means ignore.
+ */
+ public static final int TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT = -1;
+
+ /**
*
*/
public static final String RECORDS_BEFORE_PROGRESS =
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index d3204fe..78191a4 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -150,7 +150,7 @@ public class TestMapProcessor {
Path mapOutputFile = mapOutputs.getInputFile(new InputAttemptIdentifier(0, 0));
LOG.info("mapOutputFile = " + mapOutputFile);
IFile.Reader reader =
- new IFile.Reader(job, localFs, mapOutputFile, null, null);
+ new IFile.Reader(localFs, mapOutputFile, null, null, false, 0, -1);
LongWritable key = new LongWritable();
Text value = new Text();
DataInputBuffer keyBuf = new DataInputBuffer();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index 070f902..0b12a53 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -25,10 +25,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
@@ -42,7 +40,6 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
private final BroadcastShuffleManager shuffleManager;
- private final Configuration conf;
private final CompressionCodec codec;
private final Class<K> keyClass;
@@ -52,6 +49,10 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
private final DataInputBuffer keyIn;
private final DataInputBuffer valIn;
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
+ private final int ifileBufferSize;
+
private K key;
private V value;
@@ -60,18 +61,15 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
private int numRecordsRead = 0;
- public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
- Configuration conf) throws IOException {
+ public BroadcastKVReader(BroadcastShuffleManager shuffleManager, Configuration conf,
+ CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize)
+ throws IOException {
this.shuffleManager = shuffleManager;
- this.conf = conf;
- if (ConfigUtils.isIntermediateInputCompressed(this.conf)) {
- Class<? extends CompressionCodec> codecClass = ConfigUtils
- .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, conf);
- } else {
- codec = null;
- }
+ this.codec = codec;
+ this.ifileReadAhead = ifileReadAhead;
+ this.ifileReadAheadLength = ifileReadAheadLength;
+ this.ifileBufferSize = ifileBufferSize;
this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
@@ -182,8 +180,9 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
mfi.getBytes(), 0, (int) mfi.getSize());
} else {
- return new IFile.Reader(conf, fetchedInput.getInputStream(),
- fetchedInput.getSize(), codec, null);
+ return new IFile.Reader(fetchedInput.getInputStream(),
+ fetchedInput.getSize(), codec, null, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index 1c2092a..f9976d6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
@@ -47,18 +46,22 @@ public class BroadcastShuffleInputEventHandler {
private static final Log LOG = LogFactory.getLog(BroadcastShuffleInputEventHandler.class);
- private final Configuration conf;
private final BroadcastShuffleManager shuffleManager;
private final FetchedInputAllocator inputAllocator;
private final CompressionCodec codec;
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
- public BroadcastShuffleInputEventHandler(TezInputContext inputContext, Configuration conf,
+
+ public BroadcastShuffleInputEventHandler(TezInputContext inputContext,
BroadcastShuffleManager shuffleManager,
- FetchedInputAllocator inputAllocator, CompressionCodec codec) {
- this.conf = conf;
+ FetchedInputAllocator inputAllocator, CompressionCodec codec,
+ boolean ifileReadAhead, int ifileReadAheadLength) {
this.shuffleManager = shuffleManager;
this.inputAllocator = inputAllocator;
this.codec = codec;
+ this.ifileReadAhead = ifileReadAhead;
+ this.ifileReadAheadLength = ifileReadAheadLength;
}
public void handleEvents(List<Event> events) throws IOException {
@@ -117,9 +120,9 @@ public class BroadcastShuffleInputEventHandler {
.getData().newInput(), dataProto.getCompressedLength(), LOG);
break;
case MEMORY:
- ShuffleUtils.shuffleToMemory(conf, (MemoryFetchedInput) fetchedInput,
+ ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput,
dataProto.getData().newInput(), dataProto.getRawLength(),
- dataProto.getCompressedLength(), codec, LOG);
+ dataProto.getCompressedLength(), codec, ifileReadAhead, ifileReadAheadLength, LOG);
break;
case WAIT:
default:
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 09652d4..717f8ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -113,6 +113,10 @@ public class BroadcastShuffleManager implements FetcherCallback {
private final int readTimeout;
private final CompressionCodec codec;
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
+ private final int ifileBufferSize;
+
private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
private volatile Throwable shuffleError;
@@ -132,8 +136,23 @@ public class BroadcastShuffleManager implements FetcherCallback {
codec = null;
}
+ this.ifileReadAhead = conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+ if (this.ifileReadAhead) {
+ this.ifileReadAheadLength = conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+ } else {
+ this.ifileReadAheadLength = 0;
+ }
+ this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+ TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+
this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf);
- this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this.conf, this, this.inputManager, codec);
+ this.inputEventHandler = new BroadcastShuffleInputEventHandler(
+ inputContext, this, this.inputManager, codec, ifileReadAhead,
+ ifileReadAheadLength);
completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
@@ -269,6 +288,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
if (codec != null) {
fetcherBuilder.setCompressionParameters(codec);
}
+ fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);
// Remove obsolete inputs from the list being given to the fetcher. Also
// remove from the obsolete list.
@@ -531,7 +551,11 @@ public class BroadcastShuffleManager implements FetcherCallback {
return input;
}
/////////////////// End of methods for walking the available inputs
-
+
+ @SuppressWarnings("rawtypes")
+ public BroadcastKVReader craeteReader() throws IOException {
+ return new BroadcastKVReader(this, conf, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize);
+ }
/**
* Fake input that is added to the completed input list in case an input does not have any data.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
index b40df6f..36723b0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/localshuffle/LocalShuffle.java
@@ -58,6 +58,10 @@ public class LocalShuffle {
private final TezCounter spilledRecordsCounter;
private final CompressionCodec codec;
private final TezTaskOutput mapOutputFile;
+
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
+ private final int ifileBufferSize;
public LocalShuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
this.inputContext = inputContext;
@@ -85,6 +89,18 @@ public class LocalShuffle {
} else {
this.codec = null;
}
+ this.ifileReadAhead = conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+ if (this.ifileReadAhead) {
+ this.ifileReadAheadLength = conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+ } else {
+ this.ifileReadAheadLength = 0;
+ }
+ this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+ TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
// Always local
this.mapOutputFile = new TezLocalTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
@@ -99,6 +115,7 @@ public class LocalShuffle {
return TezMerger.merge(conf, rfs,
keyClass, valClass,
codec,
+ ifileReadAhead, ifileReadAheadLength, ifileBufferSize,
getMapFiles(),
false,
sortFactor,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 7741122..fd88973 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -41,13 +41,10 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput.Type;
@@ -90,6 +87,9 @@ class Fetcher extends Thread {
private volatile boolean stopped = false;
private Configuration job;
+
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
private static boolean sslShuffle;
private static SSLFactory sslFactory;
@@ -97,7 +97,7 @@ class Fetcher extends Thread {
public Fetcher(Configuration job,
ShuffleScheduler scheduler, MergeManager merger,
ShuffleClientMetrics metrics,
- Shuffle shuffle, SecretKey jobTokenSecret, TezInputContext inputContext) throws IOException {
+ Shuffle shuffle, SecretKey jobTokenSecret, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, TezInputContext inputContext) throws IOException {
this.job = job;
this.scheduler = scheduler;
this.merger = merger;
@@ -118,14 +118,15 @@ class Fetcher extends Thread {
wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_REDUCE.toString());
- if (ConfigUtils.isIntermediateInputCompressed(job)) {
- Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getIntermediateInputCompressorClass(job, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, job);
- decompressor = CodecPool.getDecompressor(codec);
+ this.ifileReadAhead = ifileReadAhead;
+ this.ifileReadAheadLength = ifileReadAheadLength;
+
+ if (codec != null) {
+ this.codec = codec;
+ this.decompressor = CodecPool.getDecompressor(codec);
} else {
- codec = null;
- decompressor = null;
+ this.codec = null;
+ this.decompressor = null;
}
this.connectionTimeout =
@@ -151,8 +152,8 @@ class Fetcher extends Thread {
}
}
}
- }
-
+ }
+
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
@@ -547,7 +548,7 @@ class Fetcher extends Thread {
int decompressedLength,
int compressedLength) throws IOException {
IFileInputStream checksumIn =
- new IFileInputStream(input, compressedLength, job);
+ new IFileInputStream(input, compressedLength, ifileReadAhead, ifileReadAheadLength);
input = checksumIn;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
index ae95268..479d704 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
@@ -45,7 +45,7 @@ public class InMemoryReader extends Reader {
public InMemoryReader(MergeManager merger, InputAttemptIdentifier taskAttemptId,
byte[] data, int start, int length)
throws IOException {
- super(null, null, length - start, null, null);
+ super(null, length - start, null, null, false, 0, -1);
this.merger = merger;
this.taskAttemptId = taskAttemptId;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 0abe530..27ebd5d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -108,6 +108,10 @@ public class MergeManager {
private final CompressionCodec codec;
private volatile boolean finalMergeComplete = false;
+
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
+ private final int ifileBufferSize;
public MergeManager(Configuration conf,
FileSystem localFS,
@@ -140,6 +144,18 @@ public class MergeManager {
} else {
codec = null;
}
+ this.ifileReadAhead = conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+ if (this.ifileReadAhead) {
+ this.ifileReadAheadLength = conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+ } else {
+ this.ifileReadAheadLength = 0;
+ }
+ this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+ TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
final float maxInMemCopyUse =
conf.getFloat(
@@ -408,7 +424,7 @@ public class MergeManager {
new Path(inputContext.getUniqueIdentifier()),
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
nullProgressable, null, null, null);
- TezMerger.writeFile(rIter, writer, nullProgressable, conf);
+ TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
writer.close();
LOG.info(inputContext.getUniqueIdentifier() +
@@ -476,7 +492,7 @@ public class MergeManager {
nullProgressable, spilledRecordsCounter, null, null);
if (null == combiner) {
- TezMerger.writeFile(rIter, writer, nullProgressable, conf);
+ TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
} else {
runCombineProcessor(rIter, writer);
}
@@ -552,13 +568,13 @@ public class MergeManager {
iter = TezMerger.merge(conf, rfs,
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
- codec, inputs.toArray(new Path[inputs.size()]),
- true, ioSortFactor, tmpDir,
+ codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize,
+ inputs.toArray(new Path[inputs.size()]), true, ioSortFactor, tmpDir,
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
nullProgressable, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
- TezMerger.writeFile(iter, writer, nullProgressable, conf);
+ TezMerger.writeFile(iter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
writer.close();
} catch (IOException e) {
localFS.delete(outputPath, true);
@@ -609,7 +625,8 @@ public class MergeManager {
public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size)
throws IOException {
- super(null, null, size, null, spilledRecordsCounter);
+ super(null, size, null, spilledRecordsCounter, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize);
this.kvIter = kvIter;
}
public boolean nextRawKey(DataInputBuffer key) throws IOException {
@@ -698,7 +715,7 @@ public class MergeManager {
final Writer writer = new Writer(job, fs, outputPath,
keyClass, valueClass, codec, null);
try {
- TezMerger.writeFile(rIter, writer, nullProgressable, job);
+ TezMerger.writeFile(rIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
// add to list of final disk outputs.
onDiskMapOutputs.add(outputPath);
} catch (IOException e) {
@@ -735,7 +752,8 @@ public class MergeManager {
onDiskBytes += fs.getFileStatus(file).getLen();
LOG.debug("Disk file: " + file + " Length is " +
fs.getFileStatus(file).getLen());
- diskSegments.add(new Segment(job, fs, file, codec, false,
+ diskSegments.add(new Segment(job, fs, file, codec, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize, false,
(file.toString().endsWith(
Constants.MERGED_OUTPUT_PREFIX) ?
null : mergedMapOutputsCounter)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index acf987a..766ffea 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -32,13 +32,17 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
@@ -65,6 +69,9 @@ public class Shuffle implements ExceptionReporter {
private String throwingThreadName = null;
private final int numInputs;
private final SecretKey jobTokenSecret;
+ private final CompressionCodec codec;
+ private final boolean ifileReadAhead;
+ private final int ifileReadAheadLength;
private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
@@ -81,6 +88,24 @@ public class Shuffle implements ExceptionReporter {
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+ if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+ Class<? extends CompressionCodec> codecClass =
+ ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ } else {
+ codec = null;
+ }
+ this.ifileReadAhead = conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+ if (this.ifileReadAhead) {
+ this.ifileReadAheadLength = conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+ } else {
+ this.ifileReadAheadLength = 0;
+ }
+
Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);
FileSystem localFS = FileSystem.getLocal(this.conf);
@@ -101,7 +126,9 @@ public class Shuffle implements ExceptionReporter {
TezCounter mergedMapOutputsCounter =
inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
- LOG.info("Shuffle assigned with " + numInputs + " inputs");
+ LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
+ + (codec == null ? "None" : codec.getClass().getName()) +
+ "ifileReadAhead: " + ifileReadAhead);
scheduler = new ShuffleScheduler(
this.inputContext,
@@ -184,7 +211,10 @@ public class Shuffle implements ExceptionReporter {
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
Fetcher[] fetchers = new Fetcher[numFetchers];
for (int i = 0; i < numFetchers; ++i) {
- fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, jobTokenSecret, inputContext);
+ fetchers[i] = new Fetcher(conf, scheduler, merger, metrics,
+ Shuffle.this, jobTokenSecret, ifileReadAhead, ifileReadAheadLength,
+ codec, inputContext);
+
fetchers[i].start();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index c362d98..e7519c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -62,7 +62,7 @@ public abstract class ExternalSorter {
public abstract void flush() throws IOException;
public abstract void write(Object key, Object value) throws IOException;
-
+
protected Progressable nullProgressable = new NullProgressable();
protected TezOutputContext outputContext;
protected Combiner combiner;
@@ -77,6 +77,10 @@ public abstract class ExternalSorter {
protected SerializationFactory serializationFactory;
protected Serializer keySerializer;
protected Serializer valSerializer;
+
+ protected boolean ifileReadAhead;
+ protected int ifileReadAheadLength;
+ protected int ifileBufferSize;
protected IndexedSorter sorter;
@@ -129,6 +133,20 @@ public abstract class ExternalSorter {
codec = null;
}
+ this.ifileReadAhead = this.conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+ if (this.ifileReadAhead) {
+ this.ifileReadAheadLength = conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+ } else {
+ this.ifileReadAheadLength = 0;
+ }
+ this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+ TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+
+
// Task outputs
mapOutputFile = TezRuntimeUtils.instantiateTaskOutputManager(conf, outputContext);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 18583a5..c9cbc55 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -378,12 +378,13 @@ public class IFile {
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
- public Reader(Configuration conf, FileSystem fs, Path file,
+ public Reader(FileSystem fs, Path file,
CompressionCodec codec,
- TezCounter readsCounter) throws IOException {
- this(conf, fs.open(file),
+ TezCounter readsCounter, boolean ifileReadAhead,
+ int ifileReadAheadLength, int bufferSize) throws IOException {
+ this(fs.open(file),
fs.getFileStatus(file).getLen(),
- codec, readsCounter);
+ codec, readsCounter, ifileReadAhead, ifileReadAheadLength, bufferSize);
}
/**
@@ -397,11 +398,13 @@ public class IFile {
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
- public Reader(Configuration conf, InputStream in, long length,
+ public Reader(InputStream in, long length,
CompressionCodec codec,
- TezCounter readsCounter) throws IOException {
+ TezCounter readsCounter,
+ boolean readAhead, int readAheadLength,
+ int bufferSize) throws IOException {
readRecordsCounter = readsCounter;
- checksumIn = new IFileInputStream(in,length, conf);
+ checksumIn = new IFileInputStream(in,length, readAhead, readAheadLength);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
@@ -416,8 +419,8 @@ public class IFile {
this.dataIn = new DataInputStream(this.in);
this.fileLength = length;
- if (conf != null) {
- bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+ if (bufferSize != -1) {
+ this.bufferSize = bufferSize;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index e828c0b..69ff394 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -28,14 +28,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.common.TezJobConfig;
/**
* A checksum input stream, used for IFiles.
* Used to validate the checksum of files created by {@link IFileOutputStream}.
@@ -58,21 +56,31 @@ public class IFileInputStream extends InputStream {
private ReadaheadRequest curReadahead = null;
private ReadaheadPool raPool = ReadaheadPool.getInstance();
- private boolean readahead;
- private int readaheadLength;
+ private final boolean readahead;
+ private final int readaheadLength;
public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
private boolean disableChecksumValidation = false;
/**
+ * Create a checksum input stream that reads without readAhead.
+ * @param in
+ * @param len
+ */
+ public IFileInputStream(InputStream in, long len) {
+ this(in, len, false, 0);
+ }
+
+ /**
* Create a checksum input stream that reads
* @param in The input stream to be verified for checksum.
* @param len The length of the input stream including checksum bytes.
+ * @param readAhead Whether to attempt readAhead for this stream
+ * @param readAheadLength Number of bytes to readAhead if it is enabled
*/
- public IFileInputStream(InputStream in, long len, Configuration conf) {
+ public IFileInputStream(InputStream in, long len, boolean readAhead, int readAheadLength) {
this.in = in;
- this.inFd = getFileDescriptorIfAvail(in);
sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
Integer.MAX_VALUE);
checksumSize = sum.getChecksumSize();
@@ -81,13 +89,15 @@ public class IFileInputStream extends InputStream {
length = len;
dataLength = length - checksumSize;
- conf = (conf != null) ? conf : new Configuration();
- readahead = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD);
- readaheadLength = conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+ readahead = readAhead;
+ readaheadLength = readAheadLength;
- doReadahead();
+ if (readahead) {
+ this.inFd = getFileDescriptorIfAvail(in);
+ doReadahead();
+ } else {
+ this.inFd = null;
+ }
}
private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 1b153ca..1c6ee5d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -60,7 +60,11 @@ public class PipelinedSorter extends ExternalSorter {
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
private final static int APPROX_HEADER_LENGTH = 150;
-
+
+ boolean ifileReadAhead;
+ int ifileReadAheadLength;
+ int ifileBufferSize;
+
int partitionBits;
private static final int PARTITION = 0; // partition offset in acct
@@ -354,7 +358,8 @@ public class PipelinedSorter extends ExternalSorter {
Segment s =
new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
- indexRecord.getPartLength(), codec, true);
+ indexRecord.getPartLength(), codec, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize, true);
segmentList.add(i, s);
}
@@ -380,7 +385,7 @@ public class PipelinedSorter extends ExternalSorter {
spilledRecordsCounter);
writer.setRLE(merger.needsRLE());
if (combiner == null || numSpills < minSpillsForCombine) {
- TezMerger.writeFile(kvIter, writer, nullProgressable, conf);
+ TezMerger.writeFile(kvIter, writer, nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
} else {
runCombineProcessor(kvIter, writer);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index bb4b4a2..6e1d7ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -63,7 +63,8 @@ public class TezMerger {
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class keyClass, Class valueClass,
- CompressionCodec codec,
+ CompressionCodec codec, boolean ifileReadAhead,
+ int ifileReadAheadLength, int ifileBufferSize,
Path[] inputs, boolean deleteInputs,
int mergeFactor, Path tmpDir,
RawComparator comparator, Progressable reporter,
@@ -72,7 +73,8 @@ public class TezMerger {
Progress mergePhase)
throws IOException {
return
- new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator,
+ new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize, comparator,
reporter, null).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
@@ -82,7 +84,8 @@ public class TezMerger {
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class keyClass, Class valueClass,
- CompressionCodec codec,
+ CompressionCodec codec, boolean ifileReadAhead,
+ int ifileReadAheadLength, int ifileBufferSize,
Path[] inputs, boolean deleteInputs,
int mergeFactor, Path tmpDir,
RawComparator comparator,
@@ -93,7 +96,8 @@ public class TezMerger {
Progress mergePhase)
throws IOException {
return
- new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator,
+ new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize, comparator,
reporter, mergedMapOutputsCounter).merge(
keyClass, valueClass,
mergeFactor, tmpDir,
@@ -195,16 +199,13 @@ public class TezMerger {
public static <K extends Object, V extends Object>
void writeFile(TezRawKeyValueIterator records, Writer writer,
- Progressable progressable, Configuration conf)
+ Progressable progressable, long recordsBeforeProgress)
throws IOException {
- long progressBar =
- conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS,
- TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
long recordCtr = 0;
while(records.next()) {
writer.append(records.getKey(), records.getValue());
- if (((recordCtr++) % progressBar) == 0) {
+ if (((recordCtr++) % recordsBeforeProgress) == 0) {
progressable.progress();
}
}
@@ -223,32 +224,41 @@ public class TezMerger {
CompressionCodec codec = null;
long segmentOffset = 0;
long segmentLength = -1;
+ boolean ifileReadAhead;
+ int ifileReadAheadLength;
+ int bufferSize = -1;
TezCounter mapOutputsCounter = null;
public Segment(Configuration conf, FileSystem fs, Path file,
- CompressionCodec codec, boolean preserve)
+ CompressionCodec codec, boolean ifileReadAhead,
+ int ifileReadAheadLength, int bufferSize, boolean preserve)
throws IOException {
- this(conf, fs, file, codec, preserve, null);
+ this(conf, fs, file, codec, ifileReadAhead, ifileReadAheadLength,
+ bufferSize, preserve, null);
}
public Segment(Configuration conf, FileSystem fs, Path file,
- CompressionCodec codec, boolean preserve,
- TezCounter mergedMapOutputsCounter)
+ CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLenth,
+ int bufferSize, boolean preserve, TezCounter mergedMapOutputsCounter)
throws IOException {
- this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
- mergedMapOutputsCounter);
+ this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec,
+ ifileReadAhead, ifileReadAheadLenth, bufferSize, preserve,
+ mergedMapOutputsCounter);
}
public Segment(Configuration conf, FileSystem fs, Path file,
long segmentOffset, long segmentLength,
- CompressionCodec codec,
+ CompressionCodec codec, boolean ifileReadAhead,
+ int ifileReadAheadLength, int bufferSize,
boolean preserve) throws IOException {
- this(conf, fs, file, segmentOffset, segmentLength, codec, preserve, null);
+ this(conf, fs, file, segmentOffset, segmentLength, codec, ifileReadAhead,
+ ifileReadAheadLength, bufferSize, preserve, null);
}
public Segment(Configuration conf, FileSystem fs, Path file,
long segmentOffset, long segmentLength, CompressionCodec codec,
+ boolean ifileReadAhead, int ifileReadAheadLength, int bufferSize,
boolean preserve, TezCounter mergedMapOutputsCounter)
throws IOException {
this.conf = conf;
@@ -256,6 +266,9 @@ public class TezMerger {
this.file = file;
this.codec = codec;
this.preserve = preserve;
+ this.ifileReadAhead = ifileReadAhead;
+ this.ifileReadAheadLength =ifileReadAheadLength;
+ this.bufferSize = bufferSize;
this.segmentOffset = segmentOffset;
this.segmentLength = segmentLength;
@@ -281,7 +294,8 @@ public class TezMerger {
if (reader == null) {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
- reader = new Reader(conf, in, segmentLength, codec, readsCounter);
+ reader = new Reader(in, segmentLength, codec, readsCounter,
+ ifileReadAhead, ifileReadAheadLength, bufferSize);
}
if (mapOutputsCounter != null) {
@@ -372,6 +386,10 @@ public class TezMerger {
Configuration conf;
FileSystem fs;
CompressionCodec codec;
+ boolean ifileReadAhead = TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
+ int ifileReadAheadLength = TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
+ int ifileBufferSize = TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT;
+ long recordsBeforeProgress = TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS;
List<Segment> segments = new ArrayList<Segment>();
@@ -401,11 +419,15 @@ public class TezMerger {
public MergeQueue(Configuration conf, FileSystem fs,
Path[] inputs, boolean deleteInputs,
- CompressionCodec codec, RawComparator comparator,
- Progressable reporter,
+ CompressionCodec codec, boolean ifileReadAhead,
+ int ifileReadAheadLength, int ifileBufferSize,
+ RawComparator comparator, Progressable reporter,
TezCounter mergedMapOutputsCounter)
throws IOException {
this.conf = conf;
+ // this.recordsBeforeProgress =
+ // conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS,
+ // TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
this.fs = fs;
this.codec = codec;
this.comparator = comparator;
@@ -413,7 +435,9 @@ public class TezMerger {
for (Path file : inputs) {
LOG.debug("MergeQ: adding: " + file);
- segments.add(new Segment(conf, fs, file, codec, !deleteInputs,
+ segments.add(new Segment(conf, fs, file, codec, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize,
+ !deleteInputs,
(file.toString().endsWith(
Constants.MERGED_OUTPUT_PREFIX) ?
null : mergedMapOutputsCounter)));
@@ -427,6 +451,9 @@ public class TezMerger {
List<Segment> segments, RawComparator comparator,
Progressable reporter, boolean sortSegments) {
this.conf = conf;
+ // this.recordsBeforeProgress =
+ // conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS,
+ // TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
this.fs = fs;
this.comparator = comparator;
this.segments = segments;
@@ -664,7 +691,7 @@ public class TezMerger {
Writer writer =
new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
writesCounter);
- writeFile(this, writer, reporter, conf);
+ writeFile(this, writer, reporter, recordsBeforeProgress);
writer.close();
//we finished one single level merge; now clean up the priority
@@ -673,7 +700,8 @@ public class TezMerger {
// Add the newly create segment to the list of segments to be merged
Segment tempSegment =
- new Segment(conf, fs, outputFile, codec, false);
+ new Segment(conf, fs, outputFile, codec, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize, false);
// Insert new merged segment into the sorted list
int pos = Collections.binarySearch(segments, tempSegment,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 1ff486f..778fd3d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -113,7 +113,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
@Override
public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
super.initialize(outputContext, conf, numOutputs);
-
+
// sanity checks
final float spillper = this.conf.getFloat(
TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
@@ -1051,7 +1051,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
Segment s =
new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(),
- indexRecord.getPartLength(), codec, true);
+ indexRecord.getPartLength(), codec, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize, true);
segmentList.add(i, s);
if (LOG.isDebugEnabled()) {
@@ -1084,7 +1085,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
spilledRecordsCounter);
if (combiner == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer,
- nullProgressable, conf);
+ nullProgressable, TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
} else {
runCombineProcessor(kvIter, writer);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 4f94210..c6ba6b1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -50,7 +50,6 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
public ShuffledUnorderedKVInput() {
}
- @SuppressWarnings("rawtypes")
@Override
public List<Event> initialize(TezInputContext inputContext) throws Exception {
Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
@@ -59,7 +58,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
this.shuffleManager.run();
- this.kvReader = new BroadcastKVReader(shuffleManager, conf);
+ this.kvReader = this.shuffleManager.craeteReader();
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index d9bc101..96f1caf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -69,14 +69,16 @@ public class Fetcher implements Callable<FetchResult> {
private int connectionTimeout;
private int readTimeout;
+ private boolean ifileReadAhead = TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
+ private int ifileReadAheadLength = TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
+
private final SecretKey shuffleSecret;
- private final Configuration conf;
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private final ApplicationId appId;
- private static boolean sslShuffle;
+ private static boolean sslShuffle = false;
private static SSLFactory sslFactory;
private static boolean sslFactoryInited;
@@ -103,29 +105,29 @@ public class Fetcher implements Callable<FetchResult> {
this.inputManager = inputManager;
this.shuffleSecret = shuffleSecret;
this.appId = appId;
- this.conf = conf;
this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
// TODO NEWTEZ Ideally, move this out from here into a static initializer block.
- synchronized (Fetcher.class) {
- if (!sslFactoryInited) {
- sslFactoryInited = true;
- sslShuffle = conf.getBoolean(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
- if (sslShuffle) {
- sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
- try {
- sslFactory.init();
- } catch (Exception ex) {
- sslFactory.destroy();
- throw new RuntimeException(ex);
- }
- }
- }
- }
+ // Re-enable when ssl shuffle support is needed.
+// synchronized (Fetcher.class) {
+// if (!sslFactoryInited) {
+// sslFactoryInited = true;
+// sslShuffle = conf.getBoolean(
+// TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+// TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+// if (sslShuffle) {
+// sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+// try {
+// sslFactory.init();
+// } catch (Exception ex) {
+// sslFactory.destroy();
+// throw new RuntimeException(ex);
+// }
+// }
+// }
+// }
}
@Override
@@ -261,9 +263,9 @@ public class Fetcher implements Callable<FetchResult> {
+ fetchedInput.getType());
if (fetchedInput.getType() == Type.MEMORY) {
- ShuffleUtils.shuffleToMemory(conf, (MemoryFetchedInput) fetchedInput,
+ ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput,
input, (int) decompressedLength, (int) compressedLength, codec,
- LOG);
+ ifileReadAhead, ifileReadAheadLength, LOG);
} else {
ShuffleUtils.shuffleToDisk((DiskFetchedInput) fetchedInput, input,
compressedLength, LOG);
@@ -499,6 +501,12 @@ public class Fetcher implements Callable<FetchResult> {
fetcher.readTimeout = readTimeout;
return this;
}
+
+ public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
+ fetcher.ifileReadAhead = readAhead;
+ fetcher.ifileReadAheadLength = readAheadBytes;
+ return this;
+ }
public FetcherBuilder assignWork(String host, int port, int partition,
List<InputAttemptIdentifier> inputs) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
index 25718e1..094f0fe 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
@@ -74,12 +73,12 @@ public class ShuffleUtils {
}
@SuppressWarnings("resource")
- public static void shuffleToMemory(Configuration conf,
- MemoryFetchedInput fetchedInput, InputStream input,
- int decompressedLength, int compressedLength,
- CompressionCodec codec, Log LOG) throws IOException {
+ public static void shuffleToMemory(MemoryFetchedInput fetchedInput,
+ InputStream input, int decompressedLength, int compressedLength,
+ CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength,
+ Log LOG) throws IOException {
IFileInputStream checksumIn = new IFileInputStream(input, compressedLength,
- conf);
+ ifileReadAhead, ifileReadAheadLength);
input = checksumIn;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/03fa6d04/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 019fd0e..17b7174 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -96,7 +96,7 @@ public class TestIFile {
List<KVPair> data = KVDataGen.generateTestData(true);
writeTestFile(outputPath, false, data);
- IFile.Reader reader = new IFile.Reader(defaultConf, localFs, outputPath, null, null);
+ IFile.Reader reader = new IFile.Reader(localFs, outputPath, null, null, false, 0, -1);
readAndVerify(reader, data);
reader.close();
@@ -129,7 +129,7 @@ public class TestIFile {
List<KVPair> data = KVDataGen.generateTestData(true);
writeTestFile(outputPath, true, data);
- IFile.Reader reader = new IFile.Reader(defaultConf, localFs, outputPath, null, null);
+ IFile.Reader reader = new IFile.Reader(localFs, outputPath, null, null, false, 0, -1);
readAndVerify(reader, data);
reader.close();