You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/06/07 16:16:23 UTC
hadoop git commit: HADOOP-10048. LocalDirAllocator should avoid
holding locks while accessing the filesystem. Contributed by Jason Lowe.
Repository: hadoop
Updated Branches:
refs/heads/trunk e62053030 -> c14c1b298
HADOOP-10048. LocalDirAllocator should avoid holding locks while accessing the filesystem. Contributed by Jason Lowe.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c14c1b29
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c14c1b29
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c14c1b29
Branch: refs/heads/trunk
Commit: c14c1b298e29e799f7c8f15ff24d7eba6e0cd39b
Parents: e620530
Author: Junping Du <ju...@apache.org>
Authored: Tue Jun 7 09:18:58 2016 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Tue Jun 7 09:18:58 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/fs/LocalDirAllocator.java | 153 ++++++++++++-------
1 file changed, 94 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c14c1b29/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
index 70cf87d..b14e1f0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.fs;
import java.io.*;
import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.*;
-
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -247,74 +248,101 @@ public class LocalDirAllocator {
private final Log LOG =
LogFactory.getLog(AllocatorPerContext.class);
- private int dirNumLastAccessed;
private Random dirIndexRandomizer = new Random();
- private FileSystem localFS;
- private DF[] dirDF = new DF[0];
private String contextCfgItemName;
- private String[] localDirs = new String[0];
- private String savedLocalDirs = "";
+
+ // NOTE: the context must be accessed via a local reference as it
+ // may be updated at any time to reference a different context
+ private AtomicReference<Context> currentContext;
+
+ private static class Context {
+ private AtomicInteger dirNumLastAccessed = new AtomicInteger(0);
+ private FileSystem localFS;
+ private DF[] dirDF;
+ private Path[] localDirs;
+ private String savedLocalDirs;
+
+ public int getAndIncrDirNumLastAccessed() {
+ return getAndIncrDirNumLastAccessed(1);
+ }
+
+ public int getAndIncrDirNumLastAccessed(int delta) {
+ if (localDirs.length < 2 || delta == 0) {
+ return dirNumLastAccessed.get();
+ }
+ int oldval, newval;
+ do {
+ oldval = dirNumLastAccessed.get();
+ newval = (oldval + delta) % localDirs.length;
+ } while (!dirNumLastAccessed.compareAndSet(oldval, newval));
+ return oldval;
+ }
+ }
public AllocatorPerContext(String contextCfgItemName) {
this.contextCfgItemName = contextCfgItemName;
+ this.currentContext = new AtomicReference<Context>(new Context());
}
/** This method gets called everytime before any read/write to make sure
* that any change to localDirs is reflected immediately.
*/
- private synchronized void confChanged(Configuration conf)
+ private Context confChanged(Configuration conf)
throws IOException {
+ Context ctx = currentContext.get();
String newLocalDirs = conf.get(contextCfgItemName);
if (null == newLocalDirs) {
throw new IOException(contextCfgItemName + " not configured");
}
- if (!newLocalDirs.equals(savedLocalDirs)) {
- localDirs = StringUtils.getTrimmedStrings(newLocalDirs);
- localFS = FileSystem.getLocal(conf);
- int numDirs = localDirs.length;
- ArrayList<String> dirs = new ArrayList<String>(numDirs);
+ if (!newLocalDirs.equals(ctx.savedLocalDirs)) {
+ ctx = new Context();
+ String[] dirStrings = StringUtils.getTrimmedStrings(newLocalDirs);
+ ctx.localFS = FileSystem.getLocal(conf);
+ int numDirs = dirStrings.length;
+ ArrayList<Path> dirs = new ArrayList<Path>(numDirs);
ArrayList<DF> dfList = new ArrayList<DF>(numDirs);
for (int i = 0; i < numDirs; i++) {
try {
// filter problematic directories
- Path tmpDir = new Path(localDirs[i]);
- if(localFS.mkdirs(tmpDir)|| localFS.exists(tmpDir)) {
+ Path tmpDir = new Path(dirStrings[i]);
+ if(ctx.localFS.mkdirs(tmpDir)|| ctx.localFS.exists(tmpDir)) {
try {
-
File tmpFile = tmpDir.isAbsolute()
- ? new File(localFS.makeQualified(tmpDir).toUri())
- : new File(localDirs[i]);
+ ? new File(ctx.localFS.makeQualified(tmpDir).toUri())
+ : new File(dirStrings[i]);
DiskChecker.checkDir(tmpFile);
- dirs.add(tmpFile.getPath());
+ dirs.add(new Path(tmpFile.getPath()));
dfList.add(new DF(tmpFile, 30000));
-
} catch (DiskErrorException de) {
- LOG.warn( localDirs[i] + " is not writable\n", de);
+ LOG.warn(dirStrings[i] + " is not writable\n", de);
}
} else {
- LOG.warn( "Failed to create " + localDirs[i]);
+ LOG.warn("Failed to create " + dirStrings[i]);
}
} catch (IOException ie) {
- LOG.warn( "Failed to create " + localDirs[i] + ": " +
+ LOG.warn("Failed to create " + dirStrings[i] + ": " +
ie.getMessage() + "\n", ie);
} //ignore
}
- localDirs = dirs.toArray(new String[dirs.size()]);
- dirDF = dfList.toArray(new DF[dirs.size()]);
- savedLocalDirs = newLocalDirs;
-
+ ctx.localDirs = dirs.toArray(new Path[dirs.size()]);
+ ctx.dirDF = dfList.toArray(new DF[dirs.size()]);
+ ctx.savedLocalDirs = newLocalDirs;
+
if (dirs.size() > 0) {
// randomize the first disk picked in the round-robin selection
- dirNumLastAccessed = dirIndexRandomizer.nextInt(dirs.size());
+ ctx.dirNumLastAccessed.set(dirIndexRandomizer.nextInt(dirs.size()));
}
+
+ currentContext.set(ctx);
}
+
+ return ctx;
}
- private Path createPath(String path,
+ private Path createPath(Path dir, String path,
boolean checkWrite) throws IOException {
- Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
- path);
+ Path file = new Path(dir, path);
if (checkWrite) {
//check whether we are able to create a directory here. If the disk
//happens to be RDONLY we will fail
@@ -334,7 +362,7 @@ public class LocalDirAllocator {
* @return the current directory index.
*/
int getCurrentDirectoryIndex() {
- return dirNumLastAccessed;
+ return currentContext.get().dirNumLastAccessed.get();
}
/** Get a path from the local FS. If size is known, we go
@@ -344,10 +372,10 @@ public class LocalDirAllocator {
* If size is not known, use roulette selection -- pick directories
* with probability proportional to their available space.
*/
- public synchronized Path getLocalPathForWrite(String pathStr, long size,
+ public Path getLocalPathForWrite(String pathStr, long size,
Configuration conf, boolean checkWrite) throws IOException {
- confChanged(conf);
- int numDirs = localDirs.length;
+ Context ctx = confChanged(conf);
+ int numDirs = ctx.localDirs.length;
int numDirsSearched = 0;
//remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked)
@@ -358,12 +386,12 @@ public class LocalDirAllocator {
if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability
//proportional to available size
- long[] availableOnDisk = new long[dirDF.length];
+ long[] availableOnDisk = new long[ctx.dirDF.length];
long totalAvailable = 0;
//build the "roulette wheel"
- for(int i =0; i < dirDF.length; ++i) {
- availableOnDisk[i] = dirDF[i].getAvailable();
+ for(int i =0; i < ctx.dirDF.length; ++i) {
+ availableOnDisk[i] = ctx.dirDF[i].getAvailable();
totalAvailable += availableOnDisk[i];
}
@@ -380,8 +408,8 @@ public class LocalDirAllocator {
randomPosition -= availableOnDisk[dir];
dir++;
}
- dirNumLastAccessed = dir;
- returnPath = createPath(pathStr, checkWrite);
+ ctx.dirNumLastAccessed.set(dir);
+ returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite);
if (returnPath == null) {
totalAvailable -= availableOnDisk[dir];
availableOnDisk[dir] = 0; // skip this disk
@@ -389,15 +417,21 @@ public class LocalDirAllocator {
}
}
} else {
- while (numDirsSearched < numDirs && returnPath == null) {
- long capacity = dirDF[dirNumLastAccessed].getAvailable();
+ int dirNum = ctx.getAndIncrDirNumLastAccessed();
+ while (numDirsSearched < numDirs) {
+ long capacity = ctx.dirDF[dirNum].getAvailable();
if (capacity > size) {
- returnPath = createPath(pathStr, checkWrite);
+ returnPath =
+ createPath(ctx.localDirs[dirNum], pathStr, checkWrite);
+ if (returnPath != null) {
+ ctx.getAndIncrDirNumLastAccessed(numDirsSearched);
+ break;
+ }
}
- dirNumLastAccessed++;
- dirNumLastAccessed = dirNumLastAccessed % numDirs;
+ dirNum++;
+ dirNum = dirNum % numDirs;
numDirsSearched++;
- }
+ }
}
if (returnPath != null) {
return returnPath;
@@ -432,10 +466,10 @@ public class LocalDirAllocator {
* configured dirs for the file's existence and return the complete
* path to the file when we find one
*/
- public synchronized Path getLocalPathToRead(String pathStr,
+ public Path getLocalPathToRead(String pathStr,
Configuration conf) throws IOException {
- confChanged(conf);
- int numDirs = localDirs.length;
+ Context ctx = confChanged(conf);
+ int numDirs = ctx.localDirs.length;
int numDirsSearched = 0;
//remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked)
@@ -443,8 +477,8 @@ public class LocalDirAllocator {
pathStr = pathStr.substring(1);
}
while (numDirsSearched < numDirs) {
- Path file = new Path(localDirs[numDirsSearched], pathStr);
- if (localFS.exists(file)) {
+ Path file = new Path(ctx.localDirs[numDirsSearched], pathStr);
+ if (ctx.localFS.exists(file)) {
return file;
}
numDirsSearched++;
@@ -459,10 +493,10 @@ public class LocalDirAllocator {
private final FileSystem fs;
private final String pathStr;
private int i = 0;
- private final String[] rootDirs;
+ private final Path[] rootDirs;
private Path next = null;
- private PathIterator(FileSystem fs, String pathStr, String[] rootDirs)
+ private PathIterator(FileSystem fs, String pathStr, Path[] rootDirs)
throws IOException {
this.fs = fs;
this.pathStr = pathStr;
@@ -517,21 +551,22 @@ public class LocalDirAllocator {
* @return all of the paths that exist under any of the roots
* @throws IOException
*/
- synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr,
+ Iterable<Path> getAllLocalPathsToRead(String pathStr,
Configuration conf) throws IOException {
- confChanged(conf);
+ Context ctx = confChanged(conf);
if (pathStr.startsWith("/")) {
pathStr = pathStr.substring(1);
}
- return new PathIterator(localFS, pathStr, localDirs);
+ return new PathIterator(ctx.localFS, pathStr, ctx.localDirs);
}
/** We search through all the configured dirs for the file's existence
* and return true when we find one
*/
- public synchronized boolean ifExists(String pathStr,Configuration conf) {
+ public boolean ifExists(String pathStr, Configuration conf) {
+ Context ctx = currentContext.get();
try {
- int numDirs = localDirs.length;
+ int numDirs = ctx.localDirs.length;
int numDirsSearched = 0;
//remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked)
@@ -539,8 +574,8 @@ public class LocalDirAllocator {
pathStr = pathStr.substring(1);
}
while (numDirsSearched < numDirs) {
- Path file = new Path(localDirs[numDirsSearched], pathStr);
- if (localFS.exists(file)) {
+ Path file = new Path(ctx.localDirs[numDirsSearched], pathStr);
+ if (ctx.localFS.exists(file)) {
return true;
}
numDirsSearched++;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org