You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 05:40:08 UTC
svn commit: r1079151 -
/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
Author: omalley
Date: Tue Mar 8 04:40:08 2011
New Revision: 1079151
URL: http://svn.apache.org/viewvc?rev=1079151&view=rev
Log:
commit 446e41f2424ddc866770cda677450c18feeb597f
Author: Chris Douglas <cd...@apache.org>
Date: Sun Oct 3 02:00:15 2010 -0700
Apply y201 to trunk
Modified:
hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
Modified: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java?rev=1079151&r1=1079150&r2=1079151&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java (original)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/fs/LocalDirAllocator.java Tue Mar 8 04:40:08 2011
@@ -128,8 +128,26 @@ public class LocalDirAllocator {
*/
public Path getLocalPathForWrite(String pathStr, long size,
Configuration conf) throws IOException {
+ return getLocalPathForWrite(pathStr, size, conf, true);
+ }
+
+ /** Get a path from the local FS. Pass size as
+ * SIZE_UNKNOWN if not known apriori. We
+ * round-robin over the set of disks (via the configured dirs) and return
+ * the first complete path which has enough space
+ * @param pathStr the requested path (this will be created on the first
+ * available disk)
+ * @param size the size of the file that is going to be written
+ * @param conf the Configuration object
+ * @param checkWrite ensure that the path is writable
+ * @return the complete path to the file on a local disk
+ * @throws IOException
+ */
+ public Path getLocalPathForWrite(String pathStr, long size,
+ Configuration conf,
+ boolean checkWrite) throws IOException {
AllocatorPerContext context = obtainContext(contextCfgItemName);
- return context.getLocalPathForWrite(pathStr, size, conf);
+ return context.getLocalPathForWrite(pathStr, size, conf, checkWrite);
}
/** Get a path from the local FS for reading. We search through all the
@@ -146,6 +164,23 @@ public class LocalDirAllocator {
return context.getLocalPathToRead(pathStr, conf);
}
+ /**
+ * Get all of the paths that currently exist in the working directories.
+ * @param pathStr the path underneath the roots
+ * @param conf the configuration to look up the roots in
+ * @return all of the paths that exist under any of the roots
+ * @throws IOException
+ */
+ public Iterable<Path> getAllLocalPathsToRead(String pathStr,
+ Configuration conf
+ ) throws IOException {
+ AllocatorPerContext context;
+ synchronized (this) {
+ context = obtainContext(contextCfgItemName);
+ }
+ return context.getAllLocalPathsToRead(pathStr, conf);
+ }
+
/** Creates a temporary file in the local FS. Pass size as -1 if not known
* apriori. We round-robin over the set of disks (via the configured dirs)
* and select the first complete path which has enough space. A file is
@@ -214,7 +249,8 @@ public class LocalDirAllocator {
/** This method gets called everytime before any read/write to make sure
* that any change to localDirs is reflected immediately.
*/
- private void confChanged(Configuration conf) throws IOException {
+ private synchronized void confChanged(Configuration conf
+ ) throws IOException {
String newLocalDirs = conf.get(contextCfgItemName);
if (!newLocalDirs.equals(savedLocalDirs)) {
localDirs = conf.getTrimmedStrings(contextCfgItemName);
@@ -252,18 +288,21 @@ public class LocalDirAllocator {
}
}
- private Path createPath(String path) throws IOException {
+ private Path createPath(String path,
+ boolean checkWrite) throws IOException {
Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
path);
- //check whether we are able to create a directory here. If the disk
- //happens to be RDONLY we will fail
- try {
- DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
- return file;
- } catch (DiskErrorException d) {
- LOG.warn(StringUtils.stringifyException(d));
- return null;
+ if (checkWrite) {
+ //check whether we are able to create a directory here. If the disk
+ //happens to be RDONLY we will fail
+ try {
+ DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
+ } catch (DiskErrorException d) {
+ LOG.warn(StringUtils.stringifyException(d));
+ return null;
+ }
}
+ return file;
}
/**
@@ -274,17 +313,6 @@ public class LocalDirAllocator {
return dirNumLastAccessed;
}
- /** Get a path from the local FS. This method should be used if the size of
- * the file is not known a priori.
- *
- * It will use roulette selection, picking directories
- * with probability proportional to their available space.
- */
- public synchronized Path getLocalPathForWrite(String path,
- Configuration conf) throws IOException {
- return getLocalPathForWrite(path, SIZE_UNKNOWN, conf);
- }
-
/** Get a path from the local FS. If size is known, we go
* round-robin over the set of disks (via the configured dirs) and return
* the first complete path which has enough space.
@@ -292,8 +320,10 @@ public class LocalDirAllocator {
* If size is not known, use roulette selection -- pick directories
* with probability proportional to their available space.
*/
- public synchronized Path getLocalPathForWrite(String pathStr, long size,
- Configuration conf) throws IOException {
+ public synchronized
+ Path getLocalPathForWrite(String pathStr, long size,
+ Configuration conf, boolean checkWrite
+ ) throws IOException {
confChanged(conf);
int numDirs = localDirs.length;
int numDirsSearched = 0;
@@ -325,7 +355,7 @@ public class LocalDirAllocator {
dir++;
}
dirNumLastAccessed = dir;
- returnPath = createPath(pathStr);
+ returnPath = createPath(pathStr, checkWrite);
if (returnPath == null) {
totalAvailable -= availableOnDisk[dir];
availableOnDisk[dir] = 0; // skip this disk
@@ -336,7 +366,7 @@ public class LocalDirAllocator {
while (numDirsSearched < numDirs && returnPath == null) {
long capacity = dirDF[dirNumLastAccessed].getAvailable();
if (capacity > size) {
- returnPath = createPath(pathStr);
+ returnPath = createPath(pathStr, checkWrite);
}
dirNumLastAccessed++;
dirNumLastAccessed = dirNumLastAccessed % numDirs;
@@ -362,7 +392,7 @@ public class LocalDirAllocator {
Configuration conf) throws IOException {
// find an appropriate directory
- Path path = getLocalPathForWrite(pathStr, size, conf);
+ Path path = getLocalPathForWrite(pathStr, size, conf, true);
File dir = new File(path.getParent().toUri().getPath());
String prefix = path.getName();
@@ -399,6 +429,76 @@ public class LocalDirAllocator {
" the configured local directories");
}
+ private static
+ class PathIterator implements Iterator<Path>, Iterable<Path> {
+ private final FileSystem fs;
+ private final String pathStr;
+ private int i = 0;
+ private final String[] rootDirs;
+ private Path next = null;
+
+ private PathIterator(FileSystem fs, String pathStr, String[] rootDirs
+ ) throws IOException {
+ this.fs = fs;
+ this.pathStr = pathStr;
+ this.rootDirs = rootDirs;
+ advance();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ private void advance() throws IOException {
+ while (i < rootDirs.length) {
+ next = new Path(rootDirs[i++], pathStr);
+ if (fs.exists(next)) {
+ return;
+ }
+ }
+ next = null;
+ }
+
+ @Override
+ public Path next() {
+ Path result = next;
+ try {
+ advance();
+ } catch (IOException ie) {
+ throw new RuntimeException("Can't check existance of " + next, ie);
+ }
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("read only iterator");
+ }
+
+ @Override
+ public Iterator<Path> iterator() {
+ return this;
+ }
+ }
+
+ /**
+ * Get all of the paths that currently exist in the working directories.
+ * @param pathStr the path underneath the roots
+ * @param conf the configuration to look up the roots in
+ * @return all of the paths that exist under any of the roots
+ * @throws IOException
+ */
+ synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr,
+ Configuration conf
+ ) throws IOException {
+ confChanged(conf);
+ if (pathStr.startsWith("/")) {
+ pathStr = pathStr.substring(1);
+ }
+ return new PathIterator(localFS, pathStr, localDirs);
+ }
+
/** We search through all the configured dirs for the file's existence
* and return true when we find one
*/