You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/19 19:17:46 UTC
svn commit: r497891 - in /lucene/hadoop/trunk: ./ conf/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/
src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Fri Jan 19 10:17:44 2007
New Revision: 497891
URL: http://svn.apache.org/viewvc?view=rev&rev=497891
Log:
HADOOP-830. Improve mapreduce merge performance by buffering and merging map outputs as they arrive at reduce nodes. Contributed by Devaraj.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 19 10:17:44 2007
@@ -47,6 +47,10 @@
14. HADOOP-735. Switch generated record code to use BytesWritable to
represent fields of type 'buffer'. (Milind Bhandarkar via cutting)
+15. HADOOP-830. Improve mapreduce merge performance by buffering and
+ merging multiple map outputs as they arrive at reduce nodes before
+ they're written to disk. (Devaraj Das via cutting)
+
Release 0.10.1 - 2007-01-10
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri Jan 19 10:17:44 2007
@@ -125,6 +125,18 @@
</property>
<property>
+ <name>fs.ramfs.impl</name>
+ <value>org.apache.hadoop.fs.InMemoryFileSystem</value>
+ <description>The FileSystem for ramfs: uris.</description>
+</property>
+
+<property>
+ <name>fs.inmemory.size.mb</name>
+ <value>75</value>
+ <description>The size of the in-memory filsystem instance in MB</description>
+</property>
+
+<property>
<name>dfs.datanode.bindAddress</name>
<value>0.0.0.0</value>
<description>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Fri Jan 19 10:17:44 2007
@@ -177,4 +177,11 @@
return ((Buffer)out).getPos();
}
+ public static long getChecksumLength(long size, int bytesPerSum) {
+ //the checksum length is equal to size passed divided by bytesPerSum +
+ //bytes written in the beginning of the checksum file.
+ return ((long)(Math.ceil((float)size/bytesPerSum)) + 1) * 4 +
+ CHECKSUM_VERSION.length;
+ }
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Fri Jan 19 10:17:44 2007
@@ -200,6 +200,13 @@
return new Path(file.getParent(), "."+file.getName()+".crc");
}
+ /** Return the length of the checksum file given the size of the
+ * actual file.
+ **/
+ public static long getChecksumFileLength(long fileSize, int bytesPerSum) {
+ return FSDataOutputStream.getChecksumLength(fileSize, bytesPerSum);
+ }
+
/** Return true iff file is a checksum file name.*/
public static boolean isChecksumFile(Path file) {
String name = file.getName();
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?view=auto&rev=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Fri Jan 19 10:17:44 2007
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/** An implementation of the in-memory filesystem. This implementation assumes
+ * that the file lengths are known ahead of time and the total lengths of all
+ * the files is below a certain number (like 100 MB, configurable). Use the API
+ * reserveSpaceWithCheckSum(Path f, int size) (see below for a description of
+ * the API for reserving space in the FS. The uri of this filesystem starts with
+ * ramfs:// .
+ * @author ddas
+ *
+ */
+public class InMemoryFileSystem extends FileSystem {
+ private URI uri;
+ private int fsSize;
+ private volatile int totalUsed;
+ private Path staticWorkingDir;
+ private int bytesPerSum;
+
+ //pathToFileAttribs is the final place where a file is put after it is closed
+ private Map <String, FileAttributes> pathToFileAttribs =
+ Collections.synchronizedMap(new HashMap());
+
+ //tempFileAttribs is a temp place which is updated while reserving memory for
+ //files we are going to create. It is read in the createRaw method and the
+ //temp key/value is discarded. If the file makes it to "close", then it
+ //ends up being in the pathToFileAttribs map.
+ private Map <String, FileAttributes> tempFileAttribs =
+ Collections.synchronizedMap(new HashMap());
+
+ public InMemoryFileSystem() {}
+
+ public InMemoryFileSystem(URI uri, Configuration conf) {
+ initialize(uri, conf);
+ }
+
+ //inherit javadoc
+ public void initialize(URI uri, Configuration conf) {
+ int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
+ this.fsSize = size * 1024 * 1024;
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+ this.staticWorkingDir = new Path(this.uri.getPath());
+ this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512);
+ LOG.info("Initialized InMemoryFileSystem: " + uri.toString() +
+ " of size (in bytes): " + fsSize);
+ }
+
+ //inherit javadoc
+ public URI getUri() {
+ return uri;
+ }
+
+ /** @deprecated */
+ public String getName() {
+ return uri.toString();
+ }
+
+ /**
+ * Return 1x1 'inmemory' cell if the file exists.
+ * Return null if otherwise.
+ */
+ public String[][] getFileCacheHints(Path f, long start, long len)
+ throws IOException {
+ if (! exists(f)) {
+ return null;
+ } else {
+ return new String[][] {{"inmemory"}};
+ }
+ }
+
+ private class InMemoryInputStream extends FSInputStream {
+ private DataInputBuffer din = new DataInputBuffer();
+ private FileAttributes fAttr;
+
+ public InMemoryInputStream(Path f) throws IOException {
+ fAttr = pathToFileAttribs.get(getPath(f));
+ if (fAttr == null) throw new FileNotFoundException("File " + f +
+ " does not exist");
+ din.reset(fAttr.data, 0, fAttr.size);
+ }
+
+ public long getPos() throws IOException {
+ return din.getPosition();
+ }
+
+ public void seek(long pos) throws IOException {
+ if ((int)pos > fAttr.size)
+ throw new IOException("Cannot seek after EOF");
+ din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
+ }
+
+ public int available() throws IOException {
+ return din.available();
+ }
+ public boolean markSupport() { return false; }
+
+ public int read() throws IOException {
+ return din.read();
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ return din.read(b, off, len);
+ }
+
+ public long skip(long n) throws IOException { return din.skip(n); }
+ }
+
+ public FSInputStream openRaw(Path f) throws IOException {
+ return new InMemoryInputStream(f);
+ }
+
+ private class InMemoryOutputStream extends FSOutputStream {
+ private int count;
+ private FileAttributes fAttr;
+ private Path f;
+
+ public InMemoryOutputStream(Path f, FileAttributes fAttr)
+ throws IOException {
+ this.fAttr = fAttr;
+ this.f = f;
+ }
+
+ public long getPos() throws IOException {
+ return count;
+ }
+
+ public void close() throws IOException {
+ synchronized (InMemoryFileSystem.this) {
+ pathToFileAttribs.put(getPath(f), fAttr);
+ }
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return;
+ }
+ int newcount = count + len;
+ if (newcount > fAttr.size) {
+ throw new IOException("Insufficient space");
+ }
+ System.arraycopy(b, off, fAttr.data, count, len);
+ count = newcount;
+ }
+
+ public void write(int b) throws IOException {
+ int newcount = count + 1;
+ if (newcount > fAttr.size) {
+ throw new IOException("Insufficient space");
+ }
+ fAttr.data[count] = (byte)b;
+ count = newcount;
+ }
+ }
+
+ public FSOutputStream createRaw(Path f, boolean overwrite, short replication,
+ long blockSize) throws IOException {
+ if (exists(f) && ! overwrite) {
+ throw new IOException("File already exists:"+f);
+ }
+ synchronized (this) {
+ FileAttributes fAttr =(FileAttributes)tempFileAttribs.remove(getPath(f));
+ if (fAttr != null)
+ return createRaw(f, fAttr);
+ return null;
+ }
+ }
+
+ public FSOutputStream createRaw(Path f, boolean overwrite, short replication,
+ long blockSize, Progressable progress) throws IOException {
+ //ignore write-progress reporter for in-mem files
+ return createRaw(f, overwrite, replication, blockSize);
+ }
+
+ public FSOutputStream createRaw(Path f, FileAttributes fAttr)
+ throws IOException {
+ //the path is not added into the filesystem (in the pathToFileAttribs
+ //map) until close is called on the outputstream that this method is
+ //going to return
+ //Create an output stream out of data byte array
+ return new InMemoryOutputStream(f, fAttr);
+ }
+
+ public void close() throws IOException {
+ super.close();
+ if (pathToFileAttribs != null)
+ pathToFileAttribs.clear();
+ pathToFileAttribs = null;
+ if (tempFileAttribs != null)
+ tempFileAttribs.clear();
+ tempFileAttribs = null;
+ }
+
+ /**
+ * Replication is not supported for the inmemory file system.
+ */
+ public short getReplication(Path src) throws IOException {
+ return 1;
+ }
+
+ public boolean setReplicationRaw(Path src, short replication)
+ throws IOException {
+ return true;
+ }
+
+ public boolean renameRaw(Path src, Path dst) throws IOException {
+ synchronized (this) {
+ FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
+ if (fAttr == null) return false;
+ pathToFileAttribs.put(getPath(dst), fAttr);
+ return true;
+ }
+ }
+
+ public boolean deleteRaw(Path f) throws IOException {
+ synchronized (this) {
+ FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
+ if (fAttr != null) {
+ fAttr.data = null;
+ totalUsed -= fAttr.size;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ public boolean exists(Path f) throws IOException {
+ return pathToFileAttribs.containsKey(getPath(f));
+ }
+
+ /**
+ * Directory operations are not supported
+ */
+ public boolean isDirectory(Path f) throws IOException {
+ return false;
+ }
+
+ public long getLength(Path f) throws IOException {
+ return pathToFileAttribs.get(getPath(f)).size;
+ }
+
+ /**
+ * Directory operations are not supported
+ */
+ public Path[] listPathsRaw(Path f) throws IOException {
+ return null;
+ }
+ public void setWorkingDirectory(Path new_dir) {}
+ public Path getWorkingDirectory() {
+ return staticWorkingDir;
+ }
+ public boolean mkdirs(Path f) throws IOException {
+ return false;
+ }
+
+ /** lock operations are not supported */
+ public void lock(Path f, boolean shared) throws IOException {}
+ public void release(Path f) throws IOException {}
+
+ /** copy/move operations are not supported */
+ public void copyFromLocalFile(Path src, Path dst) throws IOException {}
+ public void moveFromLocalFile(Path src, Path dst) throws IOException {}
+ public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
+ throws IOException {}
+
+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ return fsOutputFile;
+ }
+
+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ }
+
+ public void reportChecksumFailure(Path p, FSInputStream in,
+ long inPos,
+ FSInputStream sums, long sumsPos) {
+ }
+
+ public long getBlockSize(Path f) throws IOException {
+ return getDefaultBlockSize();
+ }
+
+ public long getDefaultBlockSize() {
+ return 32 * 1024; //some random large number. can be anything actually
+ }
+
+ public short getDefaultReplication() {
+ return 1;
+ }
+
+ /** Some APIs exclusively for InMemoryFileSystem */
+
+ /** Register a path with its size. This will also register a checksum for
+ * the file that the user is trying to create. This is required since none
+ * of the FileSystem APIs accept the size of the file as argument. But since
+ * it is required for us to apriori know the size of the file we are going to
+ * create, the user must call this method for each file he wants to create
+ * and reserve memory for that file. We either succeed in reserving memory
+ * for both the main file and the checksum file and return true, or return
+ * false.
+ */
+ public boolean reserveSpaceWithCheckSum(Path f, int size) {
+ //get the size of the checksum file (we know it is going to be 'int'
+ //since this is an inmem fs with file sizes that will fit in 4 bytes)
+ int checksumSize = getChecksumFileLength(size);
+ synchronized (this) {
+ if (!canFitInMemory(size + checksumSize)) return false;
+ FileAttributes fileAttr;
+ FileAttributes checksumAttr;
+ try {
+ fileAttr = new FileAttributes(size);
+ checksumAttr = new FileAttributes(checksumSize);
+ } catch (OutOfMemoryError o) {
+ return false;
+ }
+ totalUsed += size + checksumSize;
+ tempFileAttribs.put(getPath(f), fileAttr);
+ tempFileAttribs.put(getPath(FileSystem.getChecksumFile(f)),checksumAttr);
+ return true;
+ }
+ }
+
+ public int getChecksumFileLength(int size) {
+ return (int)super.getChecksumFileLength(size, bytesPerSum);
+ }
+
+ /** This API getClosedFiles could have been implemented over listPathsRaw
+ * but it is an overhead to maintain directory structures for this impl of
+ * the in-memory fs.
+ */
+ public Path[] getFiles(PathFilter filter) {
+ synchronized (this) {
+ List <String> closedFilesList = new ArrayList();
+ Set paths = pathToFileAttribs.keySet();
+ if (paths == null || paths.isEmpty()) return new Path[0];
+ Iterator iter = paths.iterator();
+ while (iter.hasNext()) {
+ String f = (String)iter.next();
+ if (filter.accept(new Path(f)))
+ closedFilesList.add(f);
+ }
+ String [] names =
+ closedFilesList.toArray(new String[closedFilesList.size()]);
+ Path [] results = new Path[names.length];
+ for (int i = 0; i < names.length; i++) {
+ results[i] = new Path(names[i]);
+ }
+ return results;
+ }
+ }
+
+ public int getFSSize() {
+ return fsSize;
+ }
+
+ public float getPercentUsed() {
+ return (float)totalUsed/fsSize;
+ }
+
+ private boolean canFitInMemory(int size) {
+ if (size + totalUsed < fsSize)
+ return true;
+ return false;
+ }
+
+ private String getPath(Path f) {
+ return f.toUri().getPath();
+ }
+
+ private static class FileAttributes {
+ private byte[] data;
+ private int size;
+
+ public FileAttributes(int size) {
+ this.size = size;
+ this.data = new byte[size];
+ }
+ }
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Jan 19 10:17:44 2007
@@ -21,6 +21,8 @@
import java.io.*;
import java.util.*;
import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.rmi.server.UID;
import java.security.MessageDigest;
import org.apache.lucene.util.PriorityQueue;
@@ -1893,7 +1895,8 @@
}
/**
- * Merges the contents of files passed in Path[]
+ * Merges the contents of files passed in Path[] using a max factor value
+ * that is already set
* @param inNames the array of path names
* @param deleteInputs true if the input files should be deleted when
* unnecessary
@@ -1902,6 +1905,22 @@
*/
public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs)
throws IOException {
+ return merge(inNames, deleteInputs,
+ (inNames.length < factor) ? inNames.length : factor);
+ }
+
+ /**
+ * Merges the contents of files passed in Path[]
+ * @param inNames the array of path names
+ * @param deleteInputs true if the input files should be deleted when
+ * unnecessary
+ * @param factor the factor that will be used as the maximum merge fan-in
+ * @return RawKeyValueIteratorMergeQueue
+ * @throws IOException
+ */
+ public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
+ int factor)
+ throws IOException {
//get the segments from inNames
ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
for (int i = 0; i < inNames.length; i++) {
@@ -1911,7 +1930,7 @@
s.doSync();
a.add(s);
}
- factor = (inNames.length < factor) ? inNames.length : factor;
+ this.factor = factor;
MergeQueue mQueue = new MergeQueue(a);
return mQueue.merge();
}
@@ -1948,26 +1967,48 @@
/**
* Clones the attributes (like compression of the input file and creates a
* corresponding Writer
- * @param fileSys the FileSystem object
+ * @param ignoredFileSys the (ignored) FileSystem object
* @param inputFile the path of the input file whose attributes should be
* cloned
* @param outputFile the path of the output file
* @param prog the Progressable to report status during the file write
* @return Writer
* @throws IOException
+ * @deprecated call #cloneFileAttributes(Path,Path,Progressable) instead
*/
- public Writer cloneFileAttributes(FileSystem fileSys, Path inputFile,
- Path outputFile, Progressable prog) throws IOException {
- Reader reader = new Reader(fileSys, inputFile, 4096, conf);
+ public Writer cloneFileAttributes(FileSystem ignoredFileSys,
+ Path inputFile, Path outputFile, Progressable prog)
+ throws IOException {
+ return cloneFileAttributes(inputFile, outputFile, prog);
+ }
+
+ /**
+ * Clones the attributes (like compression of the input file and creates a
+ * corresponding Writer
+ * @param inputFile the path of the input file whose attributes should be
+ * cloned
+ * @param outputFile the path of the output file
+ * @param prog the Progressable to report status during the file write
+ * @return Writer
+ * @throws IOException
+ */
+ public Writer cloneFileAttributes(Path inputFile, Path outputFile,
+ Progressable prog) throws IOException {
+ FileSystem srcFileSys = inputFile.getFileSystem(conf);
+ Reader reader = new Reader(srcFileSys, inputFile, 4096, conf);
boolean compress = reader.isCompressed();
boolean blockCompress = reader.isBlockCompressed();
CompressionCodec codec = reader.getCompressionCodec();
reader.close();
+
+ FileSystem dstFileSys = outputFile.getFileSystem(conf);
FSDataOutputStream out;
if (prog != null)
- out = fs.create(outputFile, true, memory/(factor+1), prog);
+ out = dstFileSys.create(outputFile, true,
+ conf.getInt("io.file.buffer.size", 4096), prog);
else
- out = fs.create(outputFile, true, memory/(factor+1));
+ out = dstFileSys.create(outputFile, true,
+ conf.getInt("io.file.buffer.size", 4096));
Writer writer = createWriter(conf, out, keyClass, valClass, compress,
blockCompress, codec);
return writer;
@@ -2158,7 +2199,8 @@
Path outputFile = conf.getLocalPath("mapred.local.dir",
(outFile.suffix("." + passNo)).toString());
Writer writer = cloneFileAttributes(fs,
- mStream[0].segmentPathName, outputFile, null);
+ fs.makeQualified(mStream[0].segmentPathName),
+ fs.makeQualified(outputFile), null);
writer.sync = null; //disable sync for temp files
writeFile(this, writer);
writer.close();
@@ -2276,7 +2318,7 @@
public boolean nextRawKey() throws IOException {
if (in == null) {
Reader reader = new Reader(fs, segmentPathName,
- memory/(factor+1), segmentOffset,
+ conf.getInt("io.file.buffer.size", 4096), segmentOffset,
segmentLength, conf);
//sometimes we ignore syncs especially for temp merge files
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Fri Jan 19 10:17:44 2007
@@ -29,6 +29,19 @@
public static final long HEARTBEAT_INTERVAL = 10 * 1000;
public static final long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
+ //for the inmemory filesystem (to do in-memory merge)
+ /**
+ * Constant denoting when a merge of in memory files will be triggered
+ */
+ public static final float MAX_INMEM_FILESYS_USE = 0.5f;
+ /**
+ * Constant denoting the max size (in terms of the fraction of the total
+ * size of the filesys) of a map output file that we will try
+ * to keep in mem. Ideally, this should be a factor of MAX_INMEM_FILESYS_USE
+ */
+ public static final float MAX_INMEM_FILESIZE_FRACTION =
+ MAX_INMEM_FILESYS_USE/2;
+
//
// Result codes
//
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Fri Jan 19 10:17:44 2007
@@ -22,6 +22,8 @@
import java.io.*;
import java.net.*;
+
+import org.apache.hadoop.fs.InMemoryFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
@@ -29,7 +31,7 @@
/** The location of a map output file, as passed to a reduce task via the
* {@link InterTrackerProtocol}. */
-class MapOutputLocation implements Writable {
+class MapOutputLocation implements Writable, MRConstants {
static { // register a ctor
WritableFactories.setFactory
@@ -162,4 +164,96 @@
}
return totalBytes;
}
+
+ /**
+ * Get the map output into a local file (either in the inmemory fs or on the
+ * local fs) from the remote server.
+ * We use the file system so that we generate checksum files on the data.
+ * @param inMemFileSys the inmemory filesystem to write the file to
+ * @param localFileSys the local filesystem to write the file to
+ * @param localFilename the filename to write the data into
+ * @param reduce the reduce id to get for
+ * @param timeout number of ms for connection and read timeout
+ * @return the path of the file that got created
+ * @throws IOException when something goes wrong
+ */
+ public Path getFile(InMemoryFileSystem inMemFileSys,
+ FileSystem localFileSys,
+ Path localFilename,
+ int reduce,
+ int timeout) throws IOException, InterruptedException {
+ boolean good = false;
+ long totalBytes = 0;
+ FileSystem fileSys = localFileSys;
+ Thread currentThread = Thread.currentThread();
+ URL path = new URL(toString() + "&reduce=" + reduce);
+ try {
+ URLConnection connection = path.openConnection();
+ if (timeout > 0) {
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ }
+ InputStream input = connection.getInputStream();
+ OutputStream output = null;
+
+ //We will put a file in memory if it meets certain criteria:
+ //1. The size of the file should be less than 25% of the total inmem fs
+ //2. There is space available in the inmem fs
+
+ int length = connection.getContentLength();
+ int inMemFSSize = inMemFileSys.getFSSize();
+ int checksumLength = inMemFileSys.getChecksumFileLength(length);
+
+ boolean createInMem =
+ (((float)(length + checksumLength) / inMemFSSize <=
+ MAX_INMEM_FILESIZE_FRACTION) &&
+ inMemFileSys.reserveSpaceWithCheckSum(localFilename, length));
+
+ if (createInMem)
+ fileSys = inMemFileSys;
+ else
+ fileSys = localFileSys;
+
+ output = fileSys.create(localFilename);
+ try {
+ try {
+ byte[] buffer = new byte[64 * 1024];
+ if (currentThread.isInterrupted()) {
+ throw new InterruptedException();
+ }
+ int len = input.read(buffer);
+ while (len > 0) {
+ totalBytes += len;
+ output.write(buffer, 0 ,len);
+ if (currentThread.isInterrupted()) {
+ throw new InterruptedException();
+ }
+ len = input.read(buffer);
+ }
+ } finally {
+ output.close();
+ }
+ } finally {
+ input.close();
+ }
+ good = ((int) totalBytes) == connection.getContentLength();
+ if (!good) {
+ throw new IOException("Incomplete map output received for " + path +
+ " (" + totalBytes + " instead of " +
+ connection.getContentLength() + ")");
+ }
+ } finally {
+ if (!good) {
+ try {
+ fileSys.delete(localFilename);
+ totalBytes = 0;
+ } catch (Throwable th) {
+ // IGNORED because we are cleaning up
+ }
+ return null;
+ }
+ }
+ return fileSys.makeQualified(localFilename);
+ }
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jan 19 10:17:44 2007
@@ -233,11 +233,18 @@
// open a file to collect map output
- Path[] mapFiles = new Path[numMaps];
+ // since we don't know how many map outputs got merged in memory, we have
+ // to check whether a given map output exists, and if it does, add it in
+ // the list of files to merge, otherwise not.
+ List <Path> mapFilesList = new ArrayList();
for(int i=0; i < numMaps; i++) {
- mapFiles[i] = mapOutputFile.getInputFile(i, getTaskId());
+ Path f = mapOutputFile.getInputFile(i, getTaskId());
+ if (lfs.exists(f))
+ mapFilesList.add(f);
}
-
+ Path[] mapFiles = new Path[mapFilesList.size()];
+ mapFiles = mapFilesList.toArray(mapFiles);
+
// spawn a thread to give sort progress heartbeats
Thread sortProgress = new Thread() {
public void run() {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Jan 19 10:17:44 2007
@@ -18,16 +18,22 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InMemoryFileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
import org.apache.hadoop.util.*;
import java.io.*;
import java.util.*;
+import java.net.*;
import java.text.DecimalFormat;
import org.apache.hadoop.util.Progressable;
/** Runs a reduce task. */
-class ReduceTaskRunner extends TaskRunner {
+class ReduceTaskRunner extends TaskRunner implements MRConstants {
/** Number of ms before timing out a copy */
private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
@@ -80,11 +86,31 @@
private long lastPollTime;
/**
+ * A reference to the in memory file system for writing the map outputs to.
+ */
+ private InMemoryFileSystem inMemFileSys;
+
+ /**
* A reference to the local file system for writing the map outputs to.
*/
private FileSystem localFileSys;
/**
+ * An instance of the sorter used for doing merge
+ */
+ private SequenceFile.Sorter sorter;
+
+ /**
+ * A reference to the throwable object (if merge throws an exception)
+ */
+ private volatile Throwable mergeThrowable;
+
+ /**
+ * A flag to indicate that merge is in progress
+ */
+ private volatile boolean mergeInProgress = false;
+
+ /**
* The threads for fetching the files.
*/
private MapOutputCopier[] copiers = null;
@@ -120,9 +146,28 @@
public MapOutputLocation getLocation() { return loc; }
}
- private class PingTimer implements Progressable {
+ private class PingTimer extends Thread implements Progressable {
Task task = getTask();
TaskTracker tracker = getTracker();
+
+ public void run() {
+ LOG.info(task.getTaskId() + " Started thread: " + getName());
+ while (true) {
+ try {
+ progress();
+ Thread.sleep(Task.PROGRESS_INTERVAL);
+ }
+ catch (InterruptedException i) {
+ return;
+ }
+ catch (Throwable e) {
+ LOG.info(task.getTaskId() + " Thread Exception in " +
+ "reporting sort progress\n" +
+ StringUtils.stringifyException(e));
+ continue;
+ }
+ }
+ }
public void progress() {
task.reportProgress(tracker);
@@ -134,7 +179,6 @@
/** Copies map outputs as they become available */
private class MapOutputCopier extends Thread {
- private PingTimer pingTimer = new PingTimer();
private MapOutputLocation currentLocation = null;
private int id = nextMapOutputCopierId++;
@@ -196,7 +240,7 @@
try {
start(loc);
- size = copyOutput(loc, pingTimer);
+ size = copyOutput(loc);
} catch (IOException e) {
LOG.warn(reduceTask.getTaskId() + " copy failed: " +
loc.getMapTaskId() + " from " + loc.getHost());
@@ -215,13 +259,11 @@
/** Copies a a map output from a remote host, using raw RPC.
* @param currentLocation the map output location to be copied
- * @param pingee a status object to ping as we make progress
- * @return the size of the copied file
+ * @return the path (fully qualified) of the copied file
* @throws IOException if there is an error copying the file
* @throws InterruptedException if the copier should give up
*/
- private long copyOutput(MapOutputLocation loc,
- Progressable pingee
+ private long copyOutput(MapOutputLocation loc
) throws IOException, InterruptedException {
String reduceId = reduceTask.getTaskId();
@@ -233,21 +275,42 @@
// a working filename that will be unique to this attempt
Path tmpFilename = new Path(finalFilename + "-" + id);
// this copies the map output file
- long bytes = loc.getFile(localFileSys, tmpFilename,
- reduceTask.getPartition(), pingee,
+ tmpFilename = loc.getFile(inMemFileSys, localFileSys, tmpFilename,
+ reduceTask.getPartition(),
STALLED_COPY_TIMEOUT);
+ if (tmpFilename == null)
+ throw new IOException("File " + finalFilename + "-" + id +
+ " not created");
+ long bytes = -1;
// lock the ReduceTaskRunner while we do the rename
synchronized (ReduceTaskRunner.this) {
- // if we can't rename the file, something is broken
- if (!(new File(tmpFilename.toString()).
- renameTo(new File(finalFilename.toString())))) {
- localFileSys.delete(tmpFilename);
+ // if we can't rename the file, something is broken (and IOException
+ // will be thrown). This file could have been created in the inmemory
+ // fs or the localfs. So need to get the filesystem owning the path.
+ FileSystem fs = tmpFilename.getFileSystem(conf);
+ if (!fs.rename(tmpFilename, finalFilename)) {
+ fs.delete(tmpFilename);
throw new IOException("failure to rename map output " + tmpFilename);
}
+ bytes = fs.getLength(finalFilename);
+ LOG.info(reduceId + " done copying " + loc.getMapTaskId() +
+ " output from " + loc.getHost() + ".");
+ //Create a thread to do merges. Synchronize access/update to
+ //mergeInProgress
+ if (!mergeInProgress && inMemFileSys.getPercentUsed() >=
+ MAX_INMEM_FILESYS_USE) {
+ LOG.info(reduceId + " InMemoryFileSystem " +
+ inMemFileSys.getUri().toString() +
+ " is " + inMemFileSys.getPercentUsed() +
+ " full. Triggering merge");
+ InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
+ (LocalFileSystem)localFileSys, sorter);
+ m.setName("Thread for merging in memory files");
+ m.setDaemon(true);
+ mergeInProgress = true;
+ m.start();
+ }
}
- LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +
- " output from " + loc.getHost() + ".");
-
return bytes;
}
@@ -258,7 +321,6 @@
super(task, tracker, conf);
this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
- localFileSys = FileSystem.getLocal(conf);
this.reduceTask = (ReduceTask)getTask();
this.scheduledCopies = new ArrayList(100);
@@ -266,6 +328,18 @@
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
+ //we want to distinguish inmem fs instances for different reduces. Hence,
+ //append a unique string in the uri for the inmem fs name
+ URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
+ inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
+ LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: " +
+ uri);
+ localFileSys = FileSystem.getLocal(conf);
+ //create an instance of the sorter
+ sorter =
+ new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(),
+ conf.getMapOutputValueClass(), conf);
+
// hosts -> next contact time
this.penaltyBox = new Hashtable();
@@ -311,9 +385,14 @@
// start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
long currentTime = startTime;
-
+ PingTimer pingTimer = new PingTimer();
+ pingTimer.setName("Map output copy reporter for task " +
+ reduceTask.getTaskId());
+ pingTimer.setDaemon(true);
+ pingTimer.start();
+ try {
// loop until we get all required outputs or are killed
- while (!killed && numCopied < numOutputs) {
+ while (!killed && numCopied < numOutputs && mergeThrowable == null) {
LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
" map output(s)");
@@ -382,12 +461,11 @@
// new, just wait for a bit
try {
if (numInFlight == 0 && numScheduled == 0) {
- getTask().reportProgress(getTracker());
Thread.sleep(5000);
}
} catch (InterruptedException e) { } // IGNORE
- while (!killed && numInFlight > 0) {
+ while (!killed && numInFlight > 0 && mergeThrowable == null) {
LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
CopyResult cr = getCopyResult();
@@ -404,7 +482,6 @@
copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs +
" at " +
mbpsFormat.format(transferRate) + " MB/s)");
- getTask().reportProgress(getTracker());
} else {
// this copy failed, put it back onto neededOutputs
neededOutputs.add(new Integer(cr.getMapId()));
@@ -454,7 +531,64 @@
}
}
- return numCopied == numOutputs && !killed;
+ if (mergeThrowable != null) {
+ //set the task state to FAILED
+ TaskTracker tracker = ReduceTaskRunner.this.getTracker();
+ TaskTracker.TaskInProgress tip =
+ tracker.runningTasks.get(reduceTask.getTaskId());
+ tip.runstate = TaskStatus.State.FAILED;
+ try {
+ tip.cleanup();
+ } catch (Throwable ie2) {
+ // Ignore it, we are just trying to cleanup.
+ }
+ inMemFileSys.close();
+ }
+
+ //Do a merge of in-memory files (if there are any)
+ if (!killed && mergeThrowable == null) {
+ try {
+ //wait for an ongoing merge (if it is in flight) to complete
+ while (mergeInProgress) {
+ Thread.sleep(200);
+ }
+ LOG.info(reduceTask.getTaskId() +
+ " Copying of all map outputs complete. " +
+ "Initiating the last merge on the remaining files in " +
+ inMemFileSys.getUri());
+ //initiate merge
+ Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
+ if (inMemClosedFiles.length == 0) {
+ LOG.info(reduceTask.getTaskId() + "Nothing to merge from " +
+ inMemFileSys.getUri());
+ return numCopied == numOutputs;
+ }
+ RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true,
+ inMemClosedFiles.length);
+ //name this output file same as the name of the first file that is
+ //there in the current list of inmem files (this is guaranteed to be
+ //absent on the disk currently. So we don't overwrite a prev.
+ //created spill)
+ SequenceFile.Writer writer = sorter.cloneFileAttributes(
+ inMemFileSys.makeQualified(inMemClosedFiles[0]),
+ localFileSys.makeQualified(inMemClosedFiles[0]), null);
+ sorter.writeFile(rIter, writer);
+ writer.close();
+ LOG.info(reduceTask.getTaskId() +
+ " Merge of the " +inMemClosedFiles.length +
+ " files in InMemoryFileSystem complete." +
+ " Local file is " + inMemClosedFiles[0]);
+ } catch (Throwable t) {
+ LOG.warn("Merge of the inmemory files threw an exception: " +
+ StringUtils.stringifyException(t));
+ inMemFileSys.close();
+ return false;
+ }
+ }
+ return mergeThrowable == null && numCopied == numOutputs && !killed;
+ } finally {
+ pingTimer.interrupt();
+ }
}
@@ -529,4 +663,60 @@
}
}
+ private class InMemFSMergeThread extends Thread {
+ private InMemoryFileSystem inMemFileSys;
+ private LocalFileSystem localFileSys;
+ private SequenceFile.Sorter sorter;
+
+ public InMemFSMergeThread(InMemoryFileSystem inMemFileSys,
+ LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
+ this.inMemFileSys = inMemFileSys;
+ this.localFileSys = localFileSys;
+ this.sorter = sorter;
+ }
+ public void run() {
+ LOG.info(reduceTask.getTaskId() + " Thread started: " + getName());
+ try {
+ //initiate merge
+ Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
+ //Note that the above Path[] could be of length 0 if all copies are
+ //in flight. So we make sure that we have some 'closed' map
+ //output files to merge to get the benefit of in-memory merge
+ if (inMemClosedFiles.length >=
+ (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
+ RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true,
+ inMemClosedFiles.length);
+ //name this output file same as the name of the first file that is
+ //there in the current list of inmem files (this is guaranteed to be
+ //absent on the disk currently. So we don't overwrite a prev.
+ //created spill)
+ SequenceFile.Writer writer = sorter.cloneFileAttributes(
+ inMemFileSys.makeQualified(inMemClosedFiles[0]),
+ localFileSys.makeQualified(inMemClosedFiles[0]), null);
+ sorter.writeFile(rIter, writer);
+ writer.close();
+ LOG.info(reduceTask.getTaskId() +
+ " Merge of the " +inMemClosedFiles.length +
+ " files in InMemoryFileSystem complete." +
+ " Local file is " + inMemClosedFiles[0]);
+ }
+ else {
+ LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +
+ inMemFileSys.getUri());
+ }
+ } catch (Throwable t) {
+ LOG.warn("Merge of the inmemory files threw an exception: " +
+ StringUtils.stringifyException(t));
+ ReduceTaskRunner.this.mergeThrowable = t;
+ }
+ finally {
+ mergeInProgress = false;
+ }
+ }
+ }
+ final private static PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
+ public boolean accept(Path file) {
+ return file.toString().endsWith(".out");
+ }
+ };
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Jan 19 10:17:44 2007
@@ -885,7 +885,7 @@
class TaskInProgress {
Task task;
float progress;
- TaskStatus.State runstate;
+ volatile TaskStatus.State runstate;
long lastProgressReport;
StringBuffer diagnosticInfo = new StringBuffer();
TaskRunner runner;