You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/09 15:11:07 UTC
svn commit: r693455 - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/util/
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: ddas
Date: Tue Sep 9 06:11:05 2008
New Revision: 693455
URL: http://svn.apache.org/viewvc?rev=693455&view=rev
Log:
HADOOP-3514. Inline the CRCs in intermediate files as opposed to reading it from a different .crc files. Contributed by Jothi Padmanabhan.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java
hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep 9 06:11:05 2008
@@ -316,6 +316,9 @@
GenericMRLoadGenerator public, so they can be used in other contexts.
(Lingyun Yang via omalley)
+ HADOOP-3514. Inline the CRCs in intermediate files as opposed to reading
+ it from a different .crc file. (Jothi Padmanabhan via ddas)
+
BUG FIXES
HADOOP-3563. Refactor the distributed upgrade code so that it is
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java Tue Sep 9 06:11:05 2008
@@ -29,13 +29,19 @@
public class LocalFileSystem extends ChecksumFileSystem {
static final URI NAME = URI.create("file:///");
static private Random rand = new Random();
-
+ FileSystem rfs;
+
public LocalFileSystem() {
- super(new RawLocalFileSystem());
+ this(new RawLocalFileSystem());
+ }
+
+ public FileSystem getRaw() {
+ return rfs;
}
public LocalFileSystem(FileSystem rawLocalFileSystem) {
super(rawLocalFileSystem);
+ rfs = rawLocalFileSystem;
}
/** Convert a path to a File. */
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java Tue Sep 9 06:11:05 2008
@@ -223,9 +223,6 @@
summer.update( b, off, len );
inSum += len;
}
- // Can be removed.
- assert inSum <= bytesPerChecksum : "DataChecksum.update() : inSum " +
- inSum + " > " + " bytesPerChecksum " + bytesPerChecksum ;
}
public void update( int b ) {
summer.update( b );
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Sep 9 06:11:05 2008
@@ -66,6 +66,8 @@
long decompressedBytesWritten = 0;
long compressedBytesWritten = 0;
+ IFileOutputStream checksumOut;
+
Class<K> keyClass;
Class<V> valueClass;
Serializer<K> keySerializer;
@@ -83,17 +85,18 @@
public Writer(Configuration conf, FSDataOutputStream out,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec) throws IOException {
+ this.checksumOut = new IFileOutputStream(out);
this.rawOut = out;
this.start = this.rawOut.getPos();
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
this.compressor.reset();
- this.compressedOut = codec.createOutputStream(out, compressor);
+ this.compressedOut = codec.createOutputStream(checksumOut, compressor);
this.out = new FSDataOutputStream(this.compressedOut, null);
this.compressOutput = true;
} else {
- this.out = out;
+ this.out = new FSDataOutputStream(checksumOut,null);
}
this.keyClass = keyClass;
@@ -106,6 +109,7 @@
}
public void close() throws IOException {
+
// Close the serializers
keySerializer.close();
valueSerializer.close();
@@ -115,24 +119,25 @@
WritableUtils.writeVInt(out, EOF_MARKER);
decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
+ //Flush the stream
+ out.flush();
+
if (compressOutput) {
- // Flush data from buffers into the compressor
- out.flush();
-
// Flush & return the compressor
compressedOut.finish();
compressedOut.resetState();
CodecPool.returnCompressor(compressor);
compressor = null;
}
-
+
// Close the stream
- rawOut.flush();
+ checksumOut.close();
+
compressedBytesWritten = rawOut.getPos() - start;
-
+
// Close the underlying stream iff we own it...
if (ownOutputStream) {
- out.close();
+ rawOut.close();
}
out = null;
}
@@ -216,43 +221,71 @@
private static final int DEFAULT_BUFFER_SIZE = 128*1024;
private static final int MAX_VINT_SIZE = 9;
- FSDataInputStream rawIn; // Raw InputStream from file
InputStream in; // Possibly decompressed stream that we read
Decompressor decompressor;
long bytesRead = 0;
long fileLength = 0;
boolean eof = false;
+ IFileInputStream checksumIn;
byte[] buffer = null;
int bufferSize = DEFAULT_BUFFER_SIZE;
DataInputBuffer dataIn = new DataInputBuffer();
int recNo = 1;
-
+
+ /**
+ * Construct an IFile Reader.
+ *
+ * @param conf Configuration File
+ * @param fs FileSystem
+ * @param file Path of the file to be opened. This file should have
+ * checksum bytes for the data at the end of the file.
+ * @param codec codec
+ * @throws IOException
+ */
+
public Reader(Configuration conf, FileSystem fs, Path file,
CompressionCodec codec) throws IOException {
- this(conf, fs.open(file), fs.getFileStatus(file).getLen(), codec);
+ this(conf, fs.open(file),
+ fs.getFileStatus(file).getLen(),
+ codec);
}
protected Reader() {}
+
+ /**
+ * Construct an IFile Reader.
+ *
+ * @param conf Configuration File
+ * @param in The input stream
+ * @param length Length of the data in the stream, including the checksum
+ * bytes.
+ * @param codec codec
+ * @throws IOException
+ */
public Reader(Configuration conf, FSDataInputStream in, long length,
CompressionCodec codec) throws IOException {
- this.rawIn = in;
+ checksumIn = new IFileInputStream(in,length);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
- this.in = codec.createInputStream(in, decompressor);
+ this.in = codec.createInputStream(checksumIn, decompressor);
} else {
- this.in = in;
+ this.in = checksumIn;
}
this.fileLength = length;
this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
}
- public long getLength() { return fileLength; }
+ public long getLength() {
+ return fileLength - checksumIn.getSize();
+ }
- public long getPosition() throws IOException { return rawIn.getPos(); }
+ public long getPosition() throws IOException {
+ return checksumIn.getPosition();
+ }
/**
* Read upto len bytes into buf starting at offset off.
@@ -414,6 +447,11 @@
return bytesRead;
}
+ @Override
+ public long getLength() {
+ return fileLength;
+ }
+
private void dumpOnError() {
File dumpFile = new File("../output/" + taskAttemptId + ".dump");
System.err.println("Dumping corrupt map-output of " + taskAttemptId +
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=693455&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java Tue Sep 9 06:11:05 2008
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
+/**
+ * A checksum input stream, used for IFiles.
+ * Used to validate the checksum of files created by {@link IFileOutputStream}.
+ */
+
+class IFileInputStream extends InputStream {
+
+ private final InputStream in; //The input stream to be verified for checksum.
+ private final long length; //The total length of the input file
+ private final long dataLength;
+ private DataChecksum sum;
+ private long currentOffset = 0;
+ private byte b[];
+ private byte csum[] = null;
+ private int checksumSize;
+
+ /**
+ * Create a checksum input stream that reads
+ * @param in The input stream to be verified for checksum.
+ * @param len The length of the input stream including checksum bytes.
+ */
+ public IFileInputStream(InputStream in, long len) {
+ this.in = in;
+ sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
+ Integer.MAX_VALUE);
+ checksumSize = sum.getChecksumSize();
+ length = len;
+ dataLength = length - checksumSize;
+ b = new byte[1];
+ }
+
+ /**
+ * Close the input stream.
+ */
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ throw new IOException("Skip not supported for IFileInputStream");
+ }
+
+ public long getPosition() {
+ return (currentOffset >= dataLength) ? dataLength : currentOffset;
+ }
+
+ public long getSize() {
+ return checksumSize;
+ }
+
+ /**
+ * Read bytes from the stream.
+ * At EOF, checksum is validated, but the checksum
+ * bytes are not passed back in the buffer.
+ */
+ public int read(byte[] b, int off, int len) throws IOException {
+
+ if (currentOffset >= dataLength) {
+ return -1;
+ }
+
+ return doRead(b,off,len);
+ }
+
+ /**
+ * Read bytes from the stream.
+ * At EOF, checksum is validated and sent back
+ * as the last four bytes of the buffer. The caller should handle
+ * these bytes appropriately
+ */
+ public int readWithChecksum(byte[] b, int off, int len) throws IOException {
+
+ if (currentOffset == length) {
+ return -1;
+ }
+ else if (currentOffset >= dataLength) {
+ // If the previous read drained off all the data, then just return
+ // the checksum now. Note that checksum validation would have
+ // happened in the earlier read
+ int lenToCopy = (int) (checksumSize - (currentOffset - dataLength));
+ if (len < lenToCopy) {
+ lenToCopy = len;
+ }
+ System.arraycopy(csum, (int) (currentOffset - dataLength), b, off,
+ lenToCopy);
+ currentOffset += lenToCopy;
+ return lenToCopy;
+ }
+
+ int bytesRead = doRead(b,off,len);
+
+ if (currentOffset == dataLength) {
+ if (len >= bytesRead + checksumSize) {
+ System.arraycopy(csum, 0, b, off + bytesRead, checksumSize);
+ bytesRead += checksumSize;
+ currentOffset += checksumSize;
+ }
+ }
+ return bytesRead;
+ }
+
+ private int doRead(byte[]b, int off, int len) throws IOException {
+
+ // If we are trying to read past the end of data, just read
+ // the left over data
+ if (currentOffset + len > dataLength) {
+ len = (int) dataLength - (int)currentOffset;
+ }
+
+ int bytesRead = in.read(b, off, len);
+
+ if (bytesRead < 0) {
+ throw new ChecksumException("Checksum Error", 0);
+ }
+
+ sum.update(b,off,bytesRead);
+
+ currentOffset += bytesRead;
+
+ if (currentOffset == dataLength) {
+ // The last four bytes are checksum. Strip them and verify
+ csum = new byte[checksumSize];
+ IOUtils.readFully(in, csum, 0, checksumSize);
+ if (!sum.compare(csum, 0)) {
+ throw new ChecksumException("Checksum Error", 0);
+ }
+ }
+ return bytesRead;
+ }
+
+
+ @Override
+ public int read() throws IOException {
+ b[0] = 0;
+ int l = read(b,0,1);
+ if (l < 0) return l;
+
+ // Upgrade the b[0] to an int so as not to misinterpret the
+ // first bit of the byte as a sign bit
+ int result = 0xFF & b[0];
+ return result;
+ }
+
+ public byte[] getChecksum() {
+ return csum;
+ }
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java?rev=693455&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java Tue Sep 9 06:11:05 2008
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.FilterOutputStream;
+
+import org.apache.hadoop.util.DataChecksum;
+/**
+ * A Checksum output stream.
+ * Checksum for the contents of the file is calculated and
+ * appended to the end of the file on close of the stream.
+ * Used for IFiles
+ */
+class IFileOutputStream extends FilterOutputStream {
+ /**
+ * The output stream to be checksummed.
+ */
+ private final DataChecksum sum;
+ private byte[] barray;
+ private boolean closed = false;
+
+ /**
+ * Create a checksum output stream that writes
+ * the bytes to the given stream.
+ * @param out
+ */
+ public IFileOutputStream(OutputStream out) {
+ super(out);
+ sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
+ Integer.MAX_VALUE);
+ barray = new byte[sum.getChecksumSize()];
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ sum.writeValue(barray, 0, false);
+ out.write (barray, 0, sum.getChecksumSize());
+ out.flush();
+ }
+
+ /**
+ * Write bytes to the stream.
+ */
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ sum.update(b, off,len);
+ out.write(b,off,len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ barray[0] = (byte) (b & 0xFF);
+ write(barray,0,1);
+ }
+
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Sep 9 06:11:05 2008
@@ -40,6 +40,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
@@ -425,7 +426,8 @@
private final SpillThread spillThread = new SpillThread();
private final FileSystem localFs;
-
+ private final FileSystem rfs;
+
private final Counters.Counter mapOutputByteCounter;
private final Counters.Counter mapOutputRecordCounter;
private final Counters.Counter combineInputCounter;
@@ -439,7 +441,10 @@
localFs = FileSystem.getLocal(job);
partitions = job.getNumReduceTasks();
partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job);
- // sanity checks
+
+ rfs = ((LocalFileSystem)localFs).getRaw();
+
+ //sanity checks
final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
final int sortmb = job.getInt("io.sort.mb", 100);
@@ -891,7 +896,7 @@
// create spill file
Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
numSpills, size);
- out = localFs.create(filename);
+ out = rfs.create(filename);
// create spill index
Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
getTaskID(), numSpills,
@@ -972,7 +977,7 @@
// create spill file
Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
numSpills, size);
- out = localFs.create(filename);
+ out = rfs.create(filename);
// create spill index
Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
getTaskID(), numSpills,
@@ -1107,15 +1112,15 @@
for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
- finalOutFileSize += localFs.getFileStatus(filename[i]).getLen();
+ finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
- localFs.rename(filename[0],
- new Path(filename[0].getParent(), "file.out"));
- localFs.rename(indexFileName[0],
- new Path(indexFileName[0].getParent(),"file.out.index"));
- return;
+ rfs.rename(filename[0],
+ new Path(filename[0].getParent(), "file.out"));
+ localFs.rename(indexFileName[0],
+ new Path(indexFileName[0].getParent(),"file.out.index"));
+ return;
}
//make correction in the length to include the sequence file header
//lengths for each partition
@@ -1129,9 +1134,10 @@
getTaskID(), finalIndexFileSize);
//The output stream for the final single output file
- FSDataOutputStream finalOut = localFs.create(finalOutputFile, true,
- 4096);
-
+
+ FSDataOutputStream finalOut = rfs.create(finalOutputFile, true,
+ 4096);
+
//The final index file output stream
FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
4096);
@@ -1160,8 +1166,9 @@
long rawSegmentLength = indexIn.readLong();
long segmentLength = indexIn.readLong();
indexIn.close();
- FSDataInputStream in = localFs.open(filename[i]);
+ FSDataInputStream in = rfs.open(filename[i]);
in.seek(segmentOffset);
+
Segment<K, V> s =
new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
true);
@@ -1176,7 +1183,7 @@
//merge
@SuppressWarnings("unchecked")
RawKeyValueIterator kvIter =
- Merger.merge(job, localFs,
+ Merger.merge(job, rfs,
keyClass, valClass,
segmentList, job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()),
@@ -1203,7 +1210,7 @@
finalIndexOut.close();
//cleanup
for(int i = 0; i < numSpills; i++) {
- localFs.delete(filename[i], true);
+ rfs.delete(filename[i],true);
localFs.delete(indexFileName[i], true);
}
}
@@ -1223,7 +1230,7 @@
//StringBuffer sb = new StringBuffer();
indexOut.writeLong(start);
indexOut.writeLong(writer.getRawLength());
- long segmentLength = out.getPos() - start;
+ long segmentLength = writer.getCompressedLength();
indexOut.writeLong(segmentLength);
LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " +
segmentLength + ")");
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Sep 9 06:11:05 2008
@@ -128,7 +128,10 @@
DataInputBuffer getKey() { return key; }
DataInputBuffer getValue() { return value; }
- long getLength() { return segmentLength; }
+ long getLength() {
+ return (reader == null) ?
+ segmentLength : reader.getLength();
+ }
boolean next() throws IOException {
return reader.next(key, value);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Sep 9 06:11:05 2008
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.Math;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
@@ -335,7 +336,8 @@
}
FileSystem lfs = FileSystem.getLocal(job);
-
+ FileSystem rfs = ((LocalFileSystem)lfs).getRaw();
+
// Initialize the codec
codec = initCodec();
@@ -362,7 +364,7 @@
LOG.info("Initiating final on-disk merge with " + mapFiles.length +
" files");
RawKeyValueIterator rIter =
- Merger.merge(job, lfs,
+ Merger.merge(job,rfs,
job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
codec, mapFiles, !conf.getKeepFailedTaskFiles(),
job.getInt("io.sort.factor", 100), tempDir,
@@ -509,6 +511,7 @@
*/
private FileSystem localFileSys;
+ private FileSystem rfs;
/**
* Number of files to merge at a time
*/
@@ -1215,13 +1218,16 @@
compressedLength + " raw bytes) " +
"into RAM from " + mapOutputLoc.getTaskAttemptId());
- mapOutput = shuffleInMemory(mapOutputLoc, connection, input, (int)decompressedLength);
+ mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
+ (int)decompressedLength,
+ (int)compressedLength);
} else {
LOG.info("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into Local-FS from " + mapOutputLoc.getTaskAttemptId());
- mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength);
+ mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
+ compressedLength);
}
return mapOutput;
@@ -1266,7 +1272,8 @@
private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
URLConnection connection,
InputStream input,
- int mapOutputLength)
+ int mapOutputLength,
+ int compressedLength)
throws IOException, InterruptedException {
// Reserve ram for the map-output
boolean createdNow = ramManager.reserve(mapOutputLength, input);
@@ -1289,6 +1296,11 @@
throw ioe;
}
}
+
+ IFileInputStream checksumIn =
+ new IFileInputStream(input,compressedLength);
+
+ input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
@@ -1402,7 +1414,7 @@
OutputStream output = null;
long bytesRead = 0;
try {
- output = localFileSys.create(localFilename);
+ output = rfs.create(localFilename);
byte[] buf = new byte[64 * 1024];
int n = input.read(buf, 0, buf.length);
@@ -1541,7 +1553,9 @@
(long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit());
localFileSys = FileSystem.getLocal(conf);
-
+
+ rfs = ((LocalFileSystem)localFileSys).getRaw();
+
// hosts -> next contact time
this.penaltyBox = new LinkedHashMap<String, Long>();
@@ -2187,7 +2201,7 @@
approxOutputSize, conf)
.suffix(".merged");
Writer writer =
- new Writer(conf, localFileSys, outputPath,
+ new Writer(conf,rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec);
@@ -2195,7 +2209,7 @@
Path tmpDir = new Path(reduceTask.getTaskID().toString());
final Reporter reporter = getReporter(umbilical);
try {
- iter = Merger.merge(conf, localFileSys,
+ iter = Merger.merge(conf, rfs,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, mapFiles.toArray(new Path[mapFiles.size()]),
@@ -2275,7 +2289,7 @@
reduceTask.getTaskID(), ramfsMergeOutputSize);
Writer writer =
- new Writer(conf, localFileSys, outputPath,
+ new Writer(conf, rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec);
@@ -2289,7 +2303,7 @@
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
- rIter = Merger.merge(conf, localFileSys,
+ rIter = Merger.merge(conf, rfs,
(Class<K>)conf.getMapOutputKeyClass(),
(Class<V>)conf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Sep 9 06:11:05 2008
@@ -59,6 +59,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableUtils;
@@ -2538,6 +2539,8 @@
OutputStream outStream = null;
FSDataInputStream indexIn = null;
FSDataInputStream mapOutputIn = null;
+
+ IFileInputStream checksumInputStream = null;
long totalRead = 0;
ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
@@ -2598,8 +2601,9 @@
* send it to the reducer.
*/
//open the map-output file
- mapOutputIn = fileSys.open(mapOutputFileName);
-
+ FileSystem rfs = ((LocalFileSystem)fileSys).getRaw();
+
+ mapOutputIn = rfs.open(mapOutputFileName);
// TODO: Remove this after a 'fix' for HADOOP-3647
// The clever trick here to reduce the impact of the extra seek for
// logging the first key/value lengths is to read the lengths before
@@ -2618,8 +2622,9 @@
//seek to the correct offset for the reduce
mapOutputIn.seek(startOffset);
+ checksumInputStream = new IFileInputStream(mapOutputIn,partLength);
- int len = mapOutputIn.read(buffer, 0,
+ int len = checksumInputStream.readWithChecksum(buffer, 0,
partLength < MAX_BYTES_TO_READ
? (int)partLength : MAX_BYTES_TO_READ);
while (len > 0) {
@@ -2633,9 +2638,9 @@
}
totalRead += len;
if (totalRead == partLength) break;
- len = mapOutputIn.read(buffer, 0,
- (partLength - totalRead) < MAX_BYTES_TO_READ
- ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
+ len = checksumInputStream.readWithChecksum(buffer, 0,
+ (partLength - totalRead) < MAX_BYTES_TO_READ
+ ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
}
LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce +
@@ -2660,8 +2665,9 @@
if (indexIn != null) {
indexIn.close();
}
- if (mapOutputIn != null) {
- mapOutputIn.close();
+
+ if (checksumInputStream != null) {
+ checksumInputStream.close();
}
shuffleMetrics.serverHandlerFree();
if (ClientTraceLog.isInfoEnabled()) {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Tue Sep 9 06:11:05 2008
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
@@ -75,10 +76,11 @@
public void runValueIterator(Path tmpDir, Pair[] vals,
Configuration conf,
CompressionCodec codec) throws IOException {
- FileSystem fs = tmpDir.getFileSystem(conf);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(tmpDir, "data.in");
IFile.Writer<Text, Text> writer =
- new IFile.Writer<Text, Text>(conf, fs, path, Text.class, Text.class, codec);
+ new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, codec);
for(Pair p: vals) {
writer.append(new Text(p.key), new Text(p.value));
}
@@ -86,7 +88,7 @@
@SuppressWarnings("unchecked")
RawKeyValueIterator rawItr =
- Merger.merge(conf, fs, Text.class, Text.class, codec, new Path[]{path},
+ Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path},
false, conf.getInt("io.sort.factor", 100), tmpDir,
new Text.Comparator(), new NullProgress());
@SuppressWarnings("unchecked") // WritableComparators are not generic