You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/19 12:53:00 UTC
svn commit: r697045 - in /hadoop/core/trunk: ./ conf/
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: ddas
Date: Fri Sep 19 03:52:59 2008
New Revision: 697045
URL: http://svn.apache.org/viewvc?rev=697045&view=rev
Log:
HADOOP-3638. Caches the iFile index files in memory to reduce seeks. Contributed by Jothi Padmanabhan.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697045&r1=697044&r2=697045&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 03:52:59 2008
@@ -403,6 +403,9 @@
HADOOP-3514. Inline the CRCs in intermediate files as opposed to reading
it from a different .crc file. (Jothi Padmanabhan via ddas)
+ HADOOP-3638. Caches the iFile index files in memory to reduce seeks
+ (Jothi Padmanabhan via ddas)
+
BUG FIXES
HADOOP-3563. Refactor the distributed upgrade code so that it is
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=697045&r1=697044&r2=697045&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Sep 19 03:52:59 2008
@@ -1551,4 +1551,12 @@
</description>
</property>
+<property>
+ <name>mapred.tasktracker.indexcache.mb</name>
+ <value>10</value>
+ <description> The maximum memory that a task tracker allows for the
+ index cache that is used when serving map outputs to reducers.
+ </description>
+</property>
+
</configuration>
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java?rev=697045&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java Fri Sep 19 03:52:59 2008
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+
+class IndexCache {
+
+ private final JobConf conf;
+ private final int totalMemoryAllowed;
+ private AtomicInteger totalMemoryUsed = new AtomicInteger();
+ private static final Log LOG = LogFactory.getLog(IndexCache.class);
+
+ private final ConcurrentHashMap<String,IndexInformation> cache =
+ new ConcurrentHashMap<String,IndexInformation>();
+
+ private final LinkedBlockingQueue<String> queue =
+ new LinkedBlockingQueue<String>();
+
+ public IndexCache(JobConf conf) {
+ this.conf = conf;
+ totalMemoryAllowed =
+ conf.getInt("mapred.tasktracker.indexcache.mb", 10) * 1024 * 1024;
+ LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
+ }
+
+ /**
+ * This method gets the index information for the given mapId and reduce.
+ * It reads the index file into cache if it is not already present.
+ * @param mapId
+ * @param reduce
+ * @param fileName The file to read the index information from if it is not
+ * already present in the cache
+ * @return The Index Information
+ * @throws IOException
+ */
+ public IndexRecord getIndexInformation(String mapId, int reduce,
+ Path fileName) throws IOException {
+
+ IndexInformation info = cache.get(mapId);
+
+ if (info == null) {
+ info = readIndexFileToCache(fileName, mapId);
+ } else {
+ synchronized (info) {
+ while (null == info.indexRecordArray) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for construction", e);
+ }
+ }
+ }
+ LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+ }
+
+ if (info.indexRecordArray.length == 0 ||
+ info.indexRecordArray.length < reduce) {
+ System.out.println("I am failing here");
+ throw new IOException("Invalid request " +
+ " Map Id = " + mapId + " Reducer = " + reduce +
+ " Index Info Length = " + info.indexRecordArray.length);
+ }
+ return info.indexRecordArray[reduce];
+ }
+
+ private IndexInformation readIndexFileToCache(Path indexFileName,
+ String mapId) throws IOException {
+ IndexInformation info;
+ IndexInformation newInd = new IndexInformation();
+ if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
+ synchronized (info) {
+ while (null == info.indexRecordArray) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for construction", e);
+ }
+ }
+ }
+ LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+ return info;
+ }
+ LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
+ IndexRecord[] tmp = null;
+ try {
+ tmp = IndexRecord.readIndexFile(indexFileName, conf);
+ } catch (Throwable e) {
+ tmp = new IndexRecord[0];
+ cache.remove(mapId);
+ throw new IOException("Error Reading IndexFile",e);
+ } finally {
+ synchronized (newInd) {
+ newInd.indexRecordArray = tmp;
+ newInd.notifyAll();
+ }
+ }
+ queue.add(mapId);
+
+ if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
+ freeIndexInformation();
+ }
+ return newInd;
+ }
+
+ /**
+ * This method removes the map from the cache. It should be called when
+ * a map output on this tracker is discarded.
+ * @param mapId The taskID of this map.
+ */
+ public void removeMap(String mapId) {
+ IndexInformation info = cache.remove(mapId);
+ if (info != null) {
+ totalMemoryUsed.addAndGet(-info.getSize());
+ if (!queue.remove(mapId)) {
+ LOG.warn("Map ID" + mapId + " not found in queue!!");
+ }
+ } else {
+ LOG.info("Map ID " + mapId + " not found in cache");
+ }
+ }
+
+ /**
+ * Bring memory usage below totalMemoryAllowed.
+ */
+ private synchronized void freeIndexInformation() {
+ while (totalMemoryUsed.get() > totalMemoryAllowed) {
+ String s = queue.remove();
+ IndexInformation info = cache.remove(s);
+ if (info != null) {
+ totalMemoryUsed.addAndGet(-info.getSize());
+ }
+ }
+ }
+
+ private static class IndexInformation {
+ IndexRecord[] indexRecordArray = null;
+
+ int getSize() {
+ return ((indexRecordArray == null) ?
+ 0 : indexRecordArray.length * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ }
+ }
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java?rev=697045&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java Fri Sep 19 03:52:59 2008
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+class IndexRecord {
+ final long startOffset;
+ final long rawLength;
+ final long partLength;
+
+ public IndexRecord(long startOffset, long rawLength, long partLength) {
+ this.startOffset = startOffset;
+ this.rawLength = rawLength;
+ this.partLength = partLength;
+ }
+
+ public static IndexRecord[] readIndexFile(Path indexFileName,
+ JobConf job)
+ throws IOException {
+
+ FileSystem localFs = FileSystem.getLocal(job);
+ FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
+
+ FSDataInputStream indexInputStream = rfs.open(indexFileName);
+ long length = rfs.getFileStatus(indexFileName).getLen();
+ IFileInputStream checksumIn =
+ new IFileInputStream(indexInputStream,length);
+
+ int numEntries = (int) length/MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+
+ IndexRecord[] indexRecordArray = new IndexRecord[numEntries];
+
+ DataInputStream wrapper = new DataInputStream(checksumIn);
+
+ try {
+ for (int i= 0; i < numEntries; i++) {
+ long startOffset = wrapper.readLong();
+ long rawLength = wrapper.readLong();
+ long partLength = wrapper.readLong();
+ indexRecordArray[i] =
+ new IndexRecord(startOffset, rawLength, partLength);
+ }
+ }
+ finally {
+ //This would internally call checkumIn.close
+ wrapper.close();
+ }
+ return indexRecordArray;
+ }
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=697045&r1=697044&r2=697045&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Sep 19 03:52:59 2008
@@ -442,7 +442,11 @@
private final Counters.Counter mapOutputRecordCounter;
private final Counters.Counter combineInputCounter;
private final Counters.Counter combineOutputCounter;
-
+
+ private ArrayList<IndexRecord[]> indexCacheList;
+ private int totalIndexCacheMemory;
+ private static final int INDEX_CACHE_MEMORY_LIMIT = 1024*1024;
+
@SuppressWarnings("unchecked")
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
Reporter reporter) throws IOException {
@@ -454,6 +458,8 @@
rfs = ((LocalFileSystem)localFs).getRaw();
+ indexCacheList = new ArrayList<IndexRecord[]>();
+
//sanity checks
final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
@@ -902,16 +908,31 @@
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
FSDataOutputStream indexOut = null;
+ IFileOutputStream indexChecksumOut = null;
+ IndexRecord[] irArray = null;
try {
// create spill file
Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
numSpills, size);
out = rfs.create(filename);
- // create spill index
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- indexOut = localFs.create(indexFilename);
+ // All records (reducers) of a given spill go to
+ // the same destination (memory or file).
+ if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+ // create spill index file
+ Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+ getTaskID(), numSpills,
+ partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+
+ indexOut = rfs.create(indexFilename);
+ indexChecksumOut = new IFileOutputStream(indexOut);
+ }
+ else {
+ irArray = new IndexRecord[partitions];
+ indexCacheList.add(numSpills,irArray);
+ totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ }
+
+
final int endPosition = (kvend > kvstart)
? kvend
: kvoffsets.length + kvend;
@@ -957,8 +978,14 @@
// close the writer
writer.close();
- // write the index as <offset, raw-length, compressed-length>
- writeIndexRecord(indexOut, out, segmentStart, writer);
+ if (indexChecksumOut != null) {
+ // write the index as <offset, raw-length, compressed-length>
+ writeIndexRecord(indexChecksumOut, segmentStart, writer);
+ }
+ else {
+ irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
+ writer.getCompressedLength());
+ }
writer = null;
} finally {
if (null != writer) writer.close();
@@ -968,6 +995,9 @@
++numSpills;
} finally {
if (out != null) out.close();
+ if (indexChecksumOut != null) {
+ indexChecksumOut.close();
+ }
if (indexOut != null) indexOut.close();
}
}
@@ -982,17 +1012,30 @@
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
FSDataOutputStream indexOut = null;
+ IFileOutputStream indexChecksumOut = null;
+ IndexRecord[] irArray = null;
final int partition = partitioner.getPartition(key, value, partitions);
try {
// create spill file
Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
numSpills, size);
out = rfs.create(filename);
- // create spill index
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- indexOut = localFs.create(indexFilename);
+
+ if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+ // create spill index
+ Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+ getTaskID(), numSpills,
+ partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+
+ indexOut = rfs.create(indexFilename);
+ indexChecksumOut = new IFileOutputStream(indexOut);
+ }
+ else {
+ irArray = new IndexRecord[partitions];
+ indexCacheList.add(numSpills,irArray);
+ totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ }
+
// we don't run the combiner for a single record
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
@@ -1010,8 +1053,14 @@
}
writer.close();
- // index record
- writeIndexRecord(indexOut, out, segmentStart, writer);
+ if (indexChecksumOut != null) {
+ writeIndexRecord(indexChecksumOut,segmentStart,writer);
+ }
+ else {
+ irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
+ writer.getCompressedLength());
+ }
+ writer = null;
} catch (IOException e) {
if (null != writer) writer.close();
throw e;
@@ -1020,6 +1069,7 @@
++numSpills;
} finally {
if (out != null) out.close();
+ if (indexChecksumOut != null) indexChecksumOut.close();
if (indexOut != null) indexOut.close();
}
}
@@ -1116,20 +1166,23 @@
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
Path [] filename = new Path[numSpills];
- Path [] indexFileName = new Path[numSpills];
- FileSystem localFs = FileSystem.getLocal(job);
for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
- indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
- rfs.rename(filename[0],
- new Path(filename[0].getParent(), "file.out"));
- localFs.rename(indexFileName[0],
- new Path(indexFileName[0].getParent(),"file.out.index"));
+ rfs.rename(filename[0],
+ new Path(filename[0].getParent(), "file.out"));
+ if (indexCacheList.size() == 0) {
+ rfs.rename(mapOutputFile.getSpillIndexFile(getTaskID(), 0),
+ new Path(filename[0].getParent(),"file.out.index"));
+ }
+ else {
+ writeSingleSpillIndexToFile(getTaskID(),
+ new Path(filename[0].getParent(),"file.out.index"));
+ }
return;
}
//make correction in the length to include the sequence file header
@@ -1149,8 +1202,12 @@
4096);
//The final index file output stream
- FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
+ FSDataOutputStream finalIndexOut = rfs.create(finalIndexFile, true,
4096);
+
+ IFileOutputStream finalIndexChecksumOut =
+ new IFileOutputStream(finalIndexOut);
+
if (numSpills == 0) {
//create dummy files
for (int i = 0; i < partitions; i++) {
@@ -1158,9 +1215,10 @@
Writer<K, V> writer = new Writer<K, V>(job, finalOut,
keyClass, valClass, codec);
writer.close();
- writeIndexRecord(finalIndexOut, finalOut, segmentStart, writer);
+ writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
}
finalOut.close();
+ finalIndexChecksumOut.close();
finalIndexOut.close();
return;
}
@@ -1169,13 +1227,15 @@
//create the segments to be merged
List<Segment<K, V>> segmentList =
new ArrayList<Segment<K, V>>(numSpills);
+ TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
- FSDataInputStream indexIn = localFs.open(indexFileName[i]);
- indexIn.seek(parts * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- long segmentOffset = indexIn.readLong();
- long rawSegmentLength = indexIn.readLong();
- long segmentLength = indexIn.readLong();
- indexIn.close();
+ IndexRecord indexRecord =
+ getIndexInformation(mapId, i, parts);
+
+ long segmentOffset = indexRecord.startOffset;
+ long rawSegmentLength = indexRecord.rawLength;
+ long segmentLength = indexRecord.partLength;
+
FSDataInputStream in = rfs.open(filename[i]);
in.seek(segmentOffset);
@@ -1185,9 +1245,11 @@
segmentList.add(i, s);
if (LOG.isDebugEnabled()) {
- LOG.debug("Index: (" + indexFileName[i] + ", " + segmentOffset +
+ LOG.debug("MapId=" + mapId + " Reducer=" + parts +
+ "Spill =" + i + "(" + segmentOffset + ","+
rawSegmentLength + ", " + segmentLength + ")");
}
+ indexRecord = null;
}
//merge
@@ -1214,20 +1276,20 @@
writer.close();
//write index record
- writeIndexRecord(finalIndexOut, finalOut, segmentStart, writer);
+ writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
}
finalOut.close();
+ finalIndexChecksumOut.close();
finalIndexOut.close();
//cleanup
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
- localFs.delete(indexFileName[i], true);
}
}
}
- private void writeIndexRecord(FSDataOutputStream indexOut,
- FSDataOutputStream out, long start,
+ private void writeIndexRecord(IFileOutputStream indexOut,
+ long start,
Writer<K, V> writer)
throws IOException {
//when we write the offset/decompressed-length/compressed-length to
@@ -1238,14 +1300,73 @@
//file by doing this as opposed to writing VLong but it helps us later on.
// index record: <offset, raw-length, compressed-length>
//StringBuffer sb = new StringBuffer();
- indexOut.writeLong(start);
- indexOut.writeLong(writer.getRawLength());
+
+ DataOutputStream wrapper = new DataOutputStream(indexOut);
+ wrapper.writeLong(start);
+ wrapper.writeLong(writer.getRawLength());
long segmentLength = writer.getCompressedLength();
- indexOut.writeLong(segmentLength);
+ wrapper.writeLong(segmentLength);
LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " +
segmentLength + ")");
}
+ /**
+ * This function returns the index information for the given mapId, Spill
+ * number and reducer combination. Index Information is obtained
+ * transparently from either the indexMap or the underlying indexFile
+ * @param mapId
+ * @param spillNum
+ * @param reducer
+ * @return
+ * @throws IOException
+ */
+ private IndexRecord getIndexInformation( TaskAttemptID mapId,
+ int spillNum,
+ int reducer)
+ throws IOException {
+ IndexRecord[] irArray = null;
+
+ if (indexCacheList.size() > spillNum) {
+ irArray = indexCacheList.get(spillNum);
+ }
+ else {
+ Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, spillNum);
+ irArray = IndexRecord.readIndexFile(indexFileName, job);
+ indexCacheList.add(spillNum,irArray);
+ rfs.delete(indexFileName,false);
+ }
+ return irArray[reducer];
+ }
+
+ /**
+ * This function writes index information from the indexMap to the
+ * index file that could be used by mergeParts
+ * @param mapId
+ * @param finalName
+ * @throws IOException
+ */
+ private void writeSingleSpillIndexToFile(TaskAttemptID mapId,
+ Path finalName)
+ throws IOException {
+
+ IndexRecord[] irArray = null;
+
+ irArray = indexCacheList.get(0);
+
+ FSDataOutputStream indexOut = rfs.create(finalName);
+ IFileOutputStream indexChecksumOut = new IFileOutputStream (indexOut);
+ DataOutputStream wrapper = new DataOutputStream(indexChecksumOut);
+
+ for (int i = 0; i < irArray.length; i++) {
+ wrapper.writeLong(irArray[i].startOffset);
+ wrapper.writeLong(irArray[i].rawLength);
+ wrapper.writeLong(irArray[i].partLength);
+ }
+
+ wrapper.close();
+ indexOut.close();
+ }
+
} // MapOutputBuffer
/**
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=697045&r1=697044&r2=697045&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 19 03:52:59 2008
@@ -207,6 +207,8 @@
* Number of maptask completion events locations to poll for at one time
*/
private int probe_sample_size = 500;
+
+ private IndexCache indexCache;
/*
* A list of commitTaskActions for whom commit response has been received
@@ -475,6 +477,7 @@
(maxCurrentMapTasks +
maxCurrentReduceTasks);
}
+ this.indexCache = new IndexCache(this.fConf);
// start the taskMemoryManager thread only if enabled
setTaskMemoryManagerEnabledFlag();
if (isTaskMemoryManagerEnabled()) {
@@ -1347,6 +1350,10 @@
// Add this tips of this job to queue of tasks to be purged
for (TaskInProgress tip : rjob.tasks) {
tip.jobHasFinished(false);
+ Task t = tip.getTask();
+ if (t.isMapTask()) {
+ indexCache.removeMap(tip.getTask().getTaskID().toString());
+ }
}
// Delete the job directory for this
// task if the job is done/failed
@@ -1381,6 +1388,9 @@
// removing the job if it's the last task
removeTaskFromJob(tip.getTask().getJobID(), tip);
tip.jobHasFinished(wasFailure);
+ if (tip.getTask().isMapTask()) {
+ indexCache.removeMap(tip.getTask().getTaskID().toString());
+ }
}
}
@@ -2748,7 +2758,6 @@
// true iff IOException was caused by attempt to access input
boolean isInputException = true;
OutputStream outStream = null;
- FSDataInputStream indexIn = null;
FSDataInputStream mapOutputIn = null;
IFileInputStream checksumInputStream = null;
@@ -2756,6 +2765,9 @@
long totalRead = 0;
ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
context.getAttribute("shuffleServerMetrics");
+ TaskTracker tracker =
+ (TaskTracker) context.getAttribute("task.tracker");
+
try {
shuffleMetrics.serverHandlerBusy();
outStream = response.getOutputStream();
@@ -2781,20 +2793,13 @@
* Read the index file to get the information about where
* the map-output for the given reducer is available.
*/
- //open index file
- indexIn = fileSys.open(indexFileName);
-
- //seek to the correct offset for the given reduce
- indexIn.seek(reduce * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ IndexRecord info =
+ tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
- //read the offset and length of the partition data
- final long startOffset = indexIn.readLong();
- final long rawPartLength = indexIn.readLong();
- final long partLength = indexIn.readLong();
+ final long startOffset = info.startOffset;
+ final long rawPartLength = info.rawLength;
+ final long partLength = info.partLength;
- indexIn.close();
- indexIn = null;
-
//set the custom "Raw-Map-Output-Length" http header to
//the raw (decompressed) length
response.setHeader(RAW_MAP_OUTPUT_LENGTH, Long.toString(rawPartLength));
@@ -2859,8 +2864,6 @@
rawPartLength + " from " + startOffset + " with (" +
firstKeyLength + ", " + firstValueLength + ")");
} catch (IOException ie) {
- TaskTracker tracker =
- (TaskTracker) context.getAttribute("task.tracker");
Log log = (Log) context.getAttribute("log");
String errorMsg = ("getMapOutput(" + mapId + "," + reduceId +
") failed :\n"+
@@ -2873,10 +2876,6 @@
shuffleMetrics.failedOutput();
throw ie;
} finally {
- if (indexIn != null) {
- indexIn.close();
- }
-
if (checksumInputStream != null) {
checksumInputStream.close();
}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java?rev=697045&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java Fri Sep 19 03:52:59 2008
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import junit.framework.TestCase;
+
+public class TestIndexCache extends TestCase {
+
+ public void testLRCPolicy() throws Exception {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("seed: " + seed);
+ JobConf conf = new JobConf();
+ FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+ "cache").makeQualified(fs);
+ fs.delete(p, true);
+ conf.setInt("mapred.tasktracker.indexcache.mb", 1);
+ final int partsPerMap = 1000;
+ final int bytesPerFile = partsPerMap * 24;
+ IndexCache cache = new IndexCache(conf);
+
+ // fill cache
+ int totalsize = bytesPerFile;
+ for (; totalsize < 1024 * 1024; totalsize += bytesPerFile) {
+ Path f = new Path(p, Integer.toString(totalsize, 36));
+ writeFile(fs, f, totalsize, partsPerMap);
+ IndexRecord rec = cache.getIndexInformation(
+ Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f);
+ checkRecord(rec, totalsize);
+ }
+
+ // delete files, ensure cache retains all elem
+ for (FileStatus stat : fs.listStatus(p)) {
+ fs.delete(stat.getPath(),true);
+ }
+ for (int i = bytesPerFile; i < 1024 * 1024; i += bytesPerFile) {
+ Path f = new Path(p, Integer.toString(i, 36));
+ IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
+ r.nextInt(partsPerMap), f);
+ checkRecord(rec, i);
+ }
+
+ // push oldest (bytesPerFile) out of cache
+ Path f = new Path(p, Integer.toString(totalsize, 36));
+ writeFile(fs, f, totalsize, partsPerMap);
+ cache.getIndexInformation(Integer.toString(totalsize, 36),
+ r.nextInt(partsPerMap), f);
+ fs.delete(f, false);
+
+ // oldest fails to read, or error
+ boolean fnf = false;
+ try {
+ cache.getIndexInformation(Integer.toString(bytesPerFile, 36),
+ r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)));
+ } catch (IOException e) {
+ if (e.getCause() == null ||
+ !(e.getCause() instanceof FileNotFoundException)) {
+ throw e;
+ }
+ else {
+ fnf = true;
+ }
+ }
+ if (!fnf)
+ fail("Failed to push out last entry");
+ // should find all the other entries
+ for (int i = bytesPerFile << 1; i < 1024 * 1024; i += bytesPerFile) {
+ IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
+ r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)));
+ checkRecord(rec, i);
+ }
+ IndexRecord rec = cache.getIndexInformation(Integer.toString(totalsize, 36),
+ r.nextInt(partsPerMap), f);
+ checkRecord(rec, totalsize);
+ }
+
+ private static void checkRecord(IndexRecord rec, long fill) {
+ assertEquals(fill, rec.startOffset);
+ assertEquals(fill, rec.rawLength);
+ assertEquals(fill, rec.partLength);
+ }
+
+ private static void writeFile(FileSystem fs, Path f, long fill, int parts)
+ throws IOException {
+ FSDataOutputStream out = fs.create(f, false);
+ IFileOutputStream iout = new IFileOutputStream(out);
+ DataOutputStream dout = new DataOutputStream(iout);
+ for (int i = 0; i < parts; ++i) {
+ dout.writeLong(fill);
+ dout.writeLong(fill);
+ dout.writeLong(fill);
+ }
+ dout.close();
+ }
+}