You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/12/02 07:49:08 UTC
svn commit: r722389 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: cdouglas
Date: Mon Dec 1 22:49:07 2008
New Revision: 722389
URL: http://svn.apache.org/viewvc?rev=722389&view=rev
Log:
HADOOP-4649. Improve abstraction for spill indices.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SpillRecord.java
Removed:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=722389&r1=722388&r2=722389&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Dec 1 22:49:07 2008
@@ -147,6 +147,8 @@
HADOOP-4513. Initialize jobs asynchronously in the capacity scheduler.
(Sreekanth Ramakrishnan via yhemanth)
+ HADOOP-4649. Improve abstraction for spill indices. (cdouglas)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=722389&r1=722388&r2=722389&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Mon Dec 1 22:49:07 2008
@@ -40,7 +40,7 @@
import org.apache.hadoop.io.serializer.Serializer;
/**
- * <code>IFile</code> is the simple <key-len, key, value-len, value> format
+ * <code>IFile</code> is the simple <key-len, value-len, key, value> format
* for the intermediate map-outputs in Map-Reduce.
*
* There is a <code>Writer</code> to write out map-outputs in this format and
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java?rev=722389&r1=722388&r2=722389&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java Mon Dec 1 22:49:07 2008
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.mapred;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -66,7 +65,7 @@
info = readIndexFileToCache(fileName, mapId);
} else {
synchronized (info) {
- while (null == info.indexRecordArray) {
+ while (null == info.mapSpillRecord) {
try {
info.wait();
} catch (InterruptedException e) {
@@ -77,14 +76,13 @@
LOG.debug("IndexCache HIT: MapId " + mapId + " found");
}
- if (info.indexRecordArray.length == 0 ||
- info.indexRecordArray.length < reduce) {
- System.out.println("I am failing here");
+ if (info.mapSpillRecord.size() == 0 ||
+ info.mapSpillRecord.size() < reduce) {
throw new IOException("Invalid request " +
" Map Id = " + mapId + " Reducer = " + reduce +
- " Index Info Length = " + info.indexRecordArray.length);
+ " Index Info Length = " + info.mapSpillRecord.size());
}
- return info.indexRecordArray[reduce];
+ return info.mapSpillRecord.getIndex(reduce);
}
private IndexInformation readIndexFileToCache(Path indexFileName,
@@ -93,7 +91,7 @@
IndexInformation newInd = new IndexInformation();
if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
synchronized (info) {
- while (null == info.indexRecordArray) {
+ while (null == info.mapSpillRecord) {
try {
info.wait();
} catch (InterruptedException e) {
@@ -105,16 +103,16 @@
return info;
}
LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
- IndexRecord[] tmp = null;
+ SpillRecord tmp = null;
try {
- tmp = IndexRecord.readIndexFile(indexFileName, conf);
+ tmp = new SpillRecord(indexFileName, conf);
} catch (Throwable e) {
- tmp = new IndexRecord[0];
+ tmp = new SpillRecord(0);
cache.remove(mapId);
- throw new IOException("Error Reading IndexFile",e);
+ throw new IOException("Error Reading IndexFile", e);
} finally {
synchronized (newInd) {
- newInd.indexRecordArray = tmp;
+ newInd.mapSpillRecord = tmp;
newInd.notifyAll();
}
}
@@ -157,11 +155,12 @@
}
private static class IndexInformation {
- IndexRecord[] indexRecordArray = null;
+ SpillRecord mapSpillRecord;
int getSize() {
- return ((indexRecordArray == null) ?
- 0 : indexRecordArray.length * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ return mapSpillRecord == null
+ ? 0
+ : mapSpillRecord.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=722389&r1=722388&r2=722389&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Mon Dec 1 22:49:07 2008
@@ -448,10 +448,10 @@
private final Counters.Counter combineInputCounter;
private final Counters.Counter combineOutputCounter;
- private ArrayList<IndexRecord[]> indexCacheList;
+ private ArrayList<SpillRecord> indexCacheList;
private int totalIndexCacheMemory;
- private static final int INDEX_CACHE_MEMORY_LIMIT = 1024*1024;
-
+ private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
+
@SuppressWarnings("unchecked")
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
Reporter reporter) throws IOException {
@@ -463,7 +463,7 @@
rfs = ((LocalFileSystem)localFs).getRaw();
- indexCacheList = new ArrayList<IndexRecord[]>();
+ indexCacheList = new ArrayList<SpillRecord>();
//sanity checks
final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
@@ -912,37 +912,19 @@
: (bufvoid - bufend) + bufstart) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
- FSDataOutputStream indexOut = null;
- IFileOutputStream indexChecksumOut = null;
- IndexRecord[] irArray = null;
try {
// create spill file
- Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final SpillRecord spillRec = new SpillRecord(partitions);
+ final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
+ numSpills, size);
out = rfs.create(filename);
- // All records (reducers) of a given spill go to
- // the same destination (memory or file).
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
- // create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- indexOut = rfs.create(indexFilename);
- indexChecksumOut = new IFileOutputStream(indexOut);
- }
- else {
- irArray = new IndexRecord[partitions];
- indexCacheList.add(numSpills,irArray);
- totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
-
-
final int endPosition = (kvend > kvstart)
? kvend
: kvoffsets.length + kvend;
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
int spindex = kvstart;
+ IndexRecord rec = new IndexRecord();
InMemValBytes value = new InMemValBytes();
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
@@ -983,28 +965,34 @@
// close the writer
writer.close();
-
- if (indexChecksumOut != null) {
- // write the index as <offset, raw-length, compressed-length>
- writeIndexRecord(indexChecksumOut, segmentStart, writer);
- }
- else {
- irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
- writer.getCompressedLength());
- }
+
+ // record offsets
+ rec.startOffset = segmentStart;
+ rec.rawLength = writer.getRawLength();
+ rec.partLength = writer.getCompressedLength();
+ spillRec.putIndex(rec, i);
+
writer = null;
} finally {
if (null != writer) writer.close();
}
}
+
+ if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+ // create spill index file
+ Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+ getTaskID(), numSpills,
+ partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ spillRec.writeToFile(indexFilename, job);
+ } else {
+ indexCacheList.add(spillRec);
+ totalIndexCacheMemory +=
+ spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ }
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
- if (indexChecksumOut != null) {
- indexChecksumOut.close();
- }
- if (indexOut != null) indexOut.close();
}
}
@@ -1017,38 +1005,22 @@
throws IOException {
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
- FSDataOutputStream indexOut = null;
- IFileOutputStream indexChecksumOut = null;
- IndexRecord[] irArray = null;
final int partition = partitioner.getPartition(key, value, partitions);
try {
// create spill file
- Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final SpillRecord spillRec = new SpillRecord(partitions);
+ final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
+ numSpills, size);
out = rfs.create(filename);
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
- // create spill index
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-
- indexOut = rfs.create(indexFilename);
- indexChecksumOut = new IFileOutputStream(indexOut);
- }
- else {
- irArray = new IndexRecord[partitions];
- indexCacheList.add(numSpills,irArray);
- totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
-
// we don't run the combiner for a single record
+ IndexRecord rec = new IndexRecord();
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
- writer = new IFile.Writer<K, V>(job, out, keyClass, valClass, codec,
+ writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
spilledRecordsCounter);
if (i == partition) {
@@ -1060,24 +1032,32 @@
}
writer.close();
- if (indexChecksumOut != null) {
- writeIndexRecord(indexChecksumOut,segmentStart,writer);
- }
- else {
- irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
- writer.getCompressedLength());
- }
+ // record offsets
+ rec.startOffset = segmentStart;
+ rec.rawLength = writer.getRawLength();
+ rec.partLength = writer.getCompressedLength();
+ spillRec.putIndex(rec, i);
+
writer = null;
} catch (IOException e) {
if (null != writer) writer.close();
throw e;
}
}
+ if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+ // create spill index file
+ Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+ getTaskID(), numSpills,
+ partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ spillRec.writeToFile(indexFilename, job);
+ } else {
+ indexCacheList.add(spillRec);
+ totalIndexCacheMemory +=
+ spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ }
++numSpills;
} finally {
if (out != null) out.close();
- if (indexChecksumOut != null) indexChecksumOut.close();
- if (indexOut != null) indexOut.close();
}
}
@@ -1172,102 +1152,99 @@
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
- Path [] filename = new Path[numSpills];
-
+ final Path[] filename = new Path[numSpills];
+ final TaskAttemptID mapId = getTaskID();
+
for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
+ filename[i] = mapOutputFile.getSpillFile(mapId, i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
-
if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out"));
if (indexCacheList.size() == 0) {
- rfs.rename(mapOutputFile.getSpillIndexFile(getTaskID(), 0),
- new Path(filename[0].getParent(),"file.out.index"));
- }
- else {
- writeSingleSpillIndexToFile(getTaskID(),
+ rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
new Path(filename[0].getParent(),"file.out.index"));
+ } else {
+ indexCacheList.get(0).writeToFile(
+ new Path(filename[0].getParent(),"file.out.index"), job);
}
return;
}
+
+ // read in paged indices
+ for (int i = indexCacheList.size(); i < numSpills; ++i) {
+ Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
+ indexCacheList.add(new SpillRecord(indexFileName, job));
+ }
+
//make correction in the length to include the sequence file header
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
-
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-
- Path finalOutputFile = mapOutputFile.getOutputFileForWrite(getTaskID(),
+ Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
finalOutFileSize);
Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
- getTaskID(), finalIndexFileSize);
-
- //The output stream for the final single output file
-
- FSDataOutputStream finalOut = rfs.create(finalOutputFile, true,
- 4096);
-
- //The final index file output stream
- FSDataOutputStream finalIndexOut = rfs.create(finalIndexFile, true,
- 4096);
+ mapId, finalIndexFileSize);
- IFileOutputStream finalIndexChecksumOut =
- new IFileOutputStream(finalIndexOut);
+ //The output stream for the final single output file
+ FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
if (numSpills == 0) {
//create dummy files
- for (int i = 0; i < partitions; i++) {
- long segmentStart = finalOut.getPos();
- Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass,
- valClass, codec, null);
- writer.close();
- writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
+ IndexRecord rec = new IndexRecord();
+ SpillRecord sr = new SpillRecord(partitions);
+ try {
+ for (int i = 0; i < partitions; i++) {
+ long segmentStart = finalOut.getPos();
+ Writer<K, V> writer =
+ new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
+ writer.close();
+ rec.startOffset = segmentStart;
+ rec.rawLength = writer.getRawLength();
+ rec.partLength = writer.getCompressedLength();
+ sr.putIndex(rec, i);
+ }
+ sr.writeToFile(finalIndexFile, job);
+ } finally {
+ finalOut.close();
}
- finalOut.close();
- finalIndexChecksumOut.close();
- finalIndexOut.close();
return;
}
{
- for (int parts = 0; parts < partitions; parts++){
+ IndexRecord rec = new IndexRecord();
+ final SpillRecord spillRec = new SpillRecord(partitions);
+ for (int parts = 0; parts < partitions; parts++) {
//create the segments to be merged
- List<Segment<K, V>> segmentList =
+ List<Segment<K,V>> segmentList =
new ArrayList<Segment<K, V>>(numSpills);
- TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
- final IndexRecord indexRecord =
- getIndexInformation(mapId, i, parts);
-
- long segmentOffset = indexRecord.startOffset;
- long segmentLength = indexRecord.partLength;
+ IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
- Segment<K, V> s =
- new Segment<K, V>(job, rfs, filename[i], segmentOffset,
- segmentLength, codec, true);
+ Segment<K,V> s =
+ new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
+ indexRecord.partLength, codec, true);
segmentList.add(i, s);
-
+
if (LOG.isDebugEnabled()) {
- long rawSegmentLength = indexRecord.rawLength;
LOG.debug("MapId=" + mapId + " Reducer=" + parts +
- "Spill =" + i + "(" + segmentOffset + ","+
- rawSegmentLength + ", " + segmentLength + ")");
+ "Spill =" + i + "(" + indexRecord.startOffset + "," +
+ indexRecord.rawLength + ", " + indexRecord.partLength + ")");
}
}
-
+
//merge
@SuppressWarnings("unchecked")
- RawKeyValueIterator kvIter =
- Merger.merge(job, rfs,
+ RawKeyValueIterator kvIter = Merger.merge(job, rfs,
keyClass, valClass,
- segmentList, job.getInt("io.sort.factor", 100),
- new Path(getTaskID().toString()),
+ segmentList, job.getInt("io.sort.factor", 100),
+ new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter,
null, spilledRecordsCounter);
//write merged output to disk
long segmentStart = finalOut.getPos();
- Writer<K, V> writer =
+ Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (null == combinerClass || numSpills < minSpillsForCombine) {
@@ -1279,99 +1256,21 @@
//close
writer.close();
-
- //write index record
- writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
+
+ // record offsets
+ rec.startOffset = segmentStart;
+ rec.rawLength = writer.getRawLength();
+ rec.partLength = writer.getCompressedLength();
+ spillRec.putIndex(rec, parts);
}
+ spillRec.writeToFile(finalIndexFile, job);
finalOut.close();
- finalIndexChecksumOut.close();
- finalIndexOut.close();
- //cleanup
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}
}
}
- private void writeIndexRecord(IFileOutputStream indexOut,
- long start,
- Writer<K, V> writer)
- throws IOException {
- //when we write the offset/decompressed-length/compressed-length to
- //the final index file, we write longs for both compressed and
- //decompressed lengths. This helps us to reliably seek directly to
- //the offset/length for a partition when we start serving the
- //byte-ranges to the reduces. We probably waste some space in the
- //file by doing this as opposed to writing VLong but it helps us later on.
- // index record: <offset, raw-length, compressed-length>
- //StringBuffer sb = new StringBuffer();
-
- DataOutputStream wrapper = new DataOutputStream(indexOut);
- wrapper.writeLong(start);
- wrapper.writeLong(writer.getRawLength());
- long segmentLength = writer.getCompressedLength();
- wrapper.writeLong(segmentLength);
- LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " +
- segmentLength + ")");
- }
-
- /**
- * This function returns the index information for the given mapId, Spill
- * number and reducer combination. Index Information is obtained
- * transparently from either the indexMap or the underlying indexFile
- * @param mapId
- * @param spillNum
- * @param reducer
- * @return
- * @throws IOException
- */
- private IndexRecord getIndexInformation( TaskAttemptID mapId,
- int spillNum,
- int reducer)
- throws IOException {
- IndexRecord[] irArray = null;
-
- if (indexCacheList.size() > spillNum) {
- irArray = indexCacheList.get(spillNum);
- }
- else {
- Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, spillNum);
- irArray = IndexRecord.readIndexFile(indexFileName, job);
- indexCacheList.add(spillNum,irArray);
- rfs.delete(indexFileName,false);
- }
- return irArray[reducer];
- }
-
- /**
- * This function writes index information from the indexMap to the
- * index file that could be used by mergeParts
- * @param mapId
- * @param finalName
- * @throws IOException
- */
- private void writeSingleSpillIndexToFile(TaskAttemptID mapId,
- Path finalName)
- throws IOException {
-
- IndexRecord[] irArray = null;
-
- irArray = indexCacheList.get(0);
-
- FSDataOutputStream indexOut = rfs.create(finalName);
- IFileOutputStream indexChecksumOut = new IFileOutputStream (indexOut);
- DataOutputStream wrapper = new DataOutputStream(indexChecksumOut);
-
- for (int i = 0; i < irArray.length; i++) {
- wrapper.writeLong(irArray[i].startOffset);
- wrapper.writeLong(irArray[i].rawLength);
- wrapper.writeLong(irArray[i].partLength);
- }
-
- wrapper.close();
- indexOut.close();
- }
-
} // MapOutputBuffer
/**
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SpillRecord.java?rev=722389&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SpillRecord.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SpillRecord.java Mon Dec 1 22:49:07 2008
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+
+class SpillRecord {
+
+ /** Backing store */
+ private final ByteBuffer buf;
+ /** View of backing storage as longs */
+ private final LongBuffer entries;
+
+ public SpillRecord(int numPartitions) {
+ buf = ByteBuffer.allocate(
+ numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ entries = buf.asLongBuffer();
+ }
+
+ public SpillRecord(Path indexFileName, JobConf job) throws IOException {
+ this(indexFileName, job, new CRC32());
+ }
+
+ public SpillRecord(Path indexFileName, JobConf job, Checksum crc)
+ throws IOException {
+
+ final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+ final FSDataInputStream in = rfs.open(indexFileName);
+ try {
+ final long length = rfs.getFileStatus(indexFileName).getLen();
+ final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+
+ buf = ByteBuffer.allocate(size);
+ if (crc != null) {
+ crc.reset();
+ CheckedInputStream chk = new CheckedInputStream(in, crc);
+ IOUtils.readFully(chk, buf.array(), 0, size);
+ if (chk.getChecksum().getValue() != in.readLong()) {
+ throw new ChecksumException("Checksum error reading spill index: " +
+ indexFileName, -1);
+ }
+ } else {
+ IOUtils.readFully(in, buf.array(), 0, size);
+ }
+ entries = buf.asLongBuffer();
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Return number of IndexRecord entries in this spill.
+ */
+ public int size() {
+ return entries.capacity() / (MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8);
+ }
+
+ /**
+ * Get spill offsets for given partition.
+ */
+ public IndexRecord getIndex(int partition) {
+ final int pos = partition * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+ return new IndexRecord(entries.get(pos), entries.get(pos + 1),
+ entries.get(pos + 2));
+ }
+
+ /**
+ * Set spill offsets for given partition.
+ */
+ public void putIndex(IndexRecord rec, int partition) {
+ final int pos = partition * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+ entries.put(pos, rec.startOffset);
+ entries.put(pos + 1, rec.rawLength);
+ entries.put(pos + 2, rec.partLength);
+ }
+
+ /**
+ * Write this spill record to the location provided.
+ */
+ public void writeToFile(Path loc, JobConf job)
+ throws IOException {
+ writeToFile(loc, job, new CRC32());
+ }
+
+ public void writeToFile(Path loc, JobConf job, Checksum crc)
+ throws IOException {
+ final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+ CheckedOutputStream chk = null;
+ final FSDataOutputStream out = rfs.create(loc);
+ try {
+ if (crc != null) {
+ crc.reset();
+ chk = new CheckedOutputStream(out, crc);
+ chk.write(buf.array());
+ out.writeLong(chk.getChecksum().getValue());
+ } else {
+ out.write(buf.array());
+ }
+ } finally {
+ if (chk != null) {
+ chk.close();
+ } else {
+ out.close();
+ }
+ }
+ }
+
+}
+
+class IndexRecord {
+ long startOffset;
+ long rawLength;
+ long partLength;
+
+ public IndexRecord() { }
+
+ public IndexRecord(long startOffset, long rawLength, long partLength) {
+ this.startOffset = startOffset;
+ this.rawLength = rawLength;
+ this.partLength = partLength;
+ }
+}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java?rev=722389&r1=722388&r2=722389&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java Mon Dec 1 22:49:07 2008
@@ -21,7 +21,10 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -101,6 +104,41 @@
checkRecord(rec, totalsize);
}
+ public void testBadIndex() throws Exception {
+ final int parts = 30;
+ JobConf conf = new JobConf();
+ FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+ "cache").makeQualified(fs);
+ fs.delete(p, true);
+ conf.setInt("mapred.tasktracker.indexcache.mb", 1);
+ IndexCache cache = new IndexCache(conf);
+
+ Path f = new Path(p, "badindex");
+ FSDataOutputStream out = fs.create(f, false);
+ CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
+ DataOutputStream dout = new DataOutputStream(iout);
+ for (int i = 0; i < parts; ++i) {
+ for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
+ if (0 == (i % 3)) {
+ dout.writeLong(i);
+ } else {
+ out.writeLong(i);
+ }
+ }
+ }
+ out.writeLong(iout.getChecksum().getValue());
+ dout.close();
+ try {
+ cache.getIndexInformation("badindex", 7, f);
+ fail("Did not detect bad checksum");
+ } catch (IOException e) {
+ if (!(e.getCause() instanceof ChecksumException)) {
+ throw e;
+ }
+ }
+ }
+
private static void checkRecord(IndexRecord rec, long fill) {
assertEquals(fill, rec.startOffset);
assertEquals(fill, rec.rawLength);
@@ -110,13 +148,14 @@
private static void writeFile(FileSystem fs, Path f, long fill, int parts)
throws IOException {
FSDataOutputStream out = fs.create(f, false);
- IFileOutputStream iout = new IFileOutputStream(out);
+ CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
DataOutputStream dout = new DataOutputStream(iout);
for (int i = 0; i < parts; ++i) {
- dout.writeLong(fill);
- dout.writeLong(fill);
- dout.writeLong(fill);
+ for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
+ dout.writeLong(fill);
+ }
}
+ out.writeLong(iout.getChecksum().getValue());
dout.close();
}
}