You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2010/09/17 21:51:28 UTC
svn commit: r998286 [2/2] - in /hadoop/hdfs/branches/HDFS-1052: ./
src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/ ...
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Sep 17 19:51:27 2010
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.*;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
@@ -95,6 +96,7 @@ import java.net.InetAddress;
import java.net.URI;
import java.util.*;
import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -273,6 +275,9 @@ public class FSNamesystem implements FSC
// precision of access times.
private long accessTimePrecision = 0;
+ // lock to protect FSNamesystem.
+ private ReentrantReadWriteLock fsLock;
+
/**
* FSNamesystem constructor.
*/
@@ -293,6 +298,7 @@ public class FSNamesystem implements FSC
throws IOException {
this.systemStart = now();
this.blockManager = new BlockManager(this, conf);
+ this.fsLock = new ReentrantReadWriteLock(true); // fair locking
setConfigurationParameters(conf);
dtSecretManager = createDelegationTokenSecretManager(conf);
this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
@@ -394,11 +400,33 @@ public class FSNamesystem implements FSC
return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
}
+ // utility methods to acquire and release read lock and write lock
+ void readLock() {
+ this.fsLock.readLock().lock();
+ }
+
+ void readUnlock() {
+ this.fsLock.readLock().unlock();
+ }
+
+ void writeLock() {
+ this.fsLock.writeLock().lock();
+ }
+
+ void writeUnlock() {
+ this.fsLock.writeLock().unlock();
+ }
+
+ boolean hasWriteLock() {
+ return this.fsLock.isWriteLockedByCurrentThread();
+ }
+
/**
* dirs is a list of directories where the filesystem directory state
* is stored
*/
FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
+ this.fsLock = new ReentrantReadWriteLock(true);
this.blockManager = new BlockManager(this, conf);
setConfigurationParameters(conf);
this.dir = new FSDirectory(fsImage, this, conf);
@@ -539,7 +567,9 @@ public class FSNamesystem implements FSC
/**
* Dump all metadata into specified file
*/
- synchronized void metaSave(String filename) throws IOException {
+ void metaSave(String filename) throws IOException {
+ writeLock();
+ try {
checkSuperuserPrivilege();
File file = new File(System.getProperty("hadoop.log.dir"), filename);
PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
@@ -566,6 +596,9 @@ public class FSNamesystem implements FSC
out.flush();
out.close();
+ } finally {
+ writeUnlock();
+ }
}
long getDefaultBlockSize() {
@@ -596,8 +629,10 @@ public class FSNamesystem implements FSC
* @param datanode on which blocks are located
* @param size total size of blocks
*/
- synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
+ BlocksWithLocations getBlocks(DatanodeID datanode, long size)
throws IOException {
+ readLock();
+ try {
checkSuperuserPrivilege();
DatanodeDescriptor node = getDatanode(datanode);
@@ -638,6 +673,9 @@ public class FSNamesystem implements FSC
return new BlocksWithLocations(
results.toArray(new BlockWithLocations[results.size()]));
+ } finally {
+ readUnlock();
+ }
}
/**
@@ -674,9 +712,11 @@ public class FSNamesystem implements FSC
* Set permissions for an existing file.
* @throws IOException
*/
- public synchronized void setPermission(String src, FsPermission permission)
+ public void setPermission(String src, FsPermission permission)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
+ writeLock();
+ try {
if (isInSafeMode())
throw new SafeModeException("Cannot set permission for " + src, safeMode);
checkOwner(src);
@@ -688,15 +728,20 @@ public class FSNamesystem implements FSC
Server.getRemoteIp(),
"setPermission", src, null, stat);
}
+ } finally {
+ writeUnlock();
+ }
}
/**
* Set owner for an existing file.
* @throws IOException
*/
- public synchronized void setOwner(String src, String username, String group)
+ public void setOwner(String src, String username, String group)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
+ writeLock();
+ try {
if (isInSafeMode())
throw new SafeModeException("Cannot set owner for " + src, safeMode);
FSPermissionChecker pc = checkOwner(src);
@@ -717,6 +762,9 @@ public class FSNamesystem implements FSC
Server.getRemoteIp(),
"setOwner", src, null, stat);
}
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -768,25 +816,53 @@ public class FSNamesystem implements FSC
return ret;
}
- private synchronized LocatedBlocks getBlockLocationsInternal(String src,
+ private LocatedBlocks getBlockLocationsInternal(String src,
long offset,
long length,
boolean doAccessTime,
boolean needBlockToken)
throws FileNotFoundException, UnresolvedLinkException, IOException {
- INodeFile inode = dir.getFileINode(src);
- if (inode == null)
- throw new FileNotFoundException("File does not exist: " + src);
- assert !inode.isLink();
- if (doAccessTime && isAccessTimeSupported()) {
- dir.setTimes(src, inode, -1, now(), false);
+
+ for (int attempt = 0; attempt < 2; attempt++) {
+ if (attempt == 0) { // first attempt is with readlock
+ readLock();
+ } else { // second attempt is with write lock
+ writeLock(); // writelock is needed to set accesstime
+ }
+ try {
+ long now = now();
+ INodeFile inode = dir.getFileINode(src);
+ if (inode == null) {
+ throw new FileNotFoundException("File does not exist: " + src);
+ }
+ assert !inode.isLink();
+ if (doAccessTime && isAccessTimeSupported()) {
+ if (now <= inode.getAccessTime() + getAccessTimePrecision()) {
+ // if we have to set access time but we only have the readlock, then
+ // restart this entire operation with the writeLock.
+ if (attempt == 0) {
+ continue;
+ }
+ }
+ dir.setTimes(src, inode, -1, now, false);
+ }
+ return getBlockLocationsInternal(inode, offset, length, needBlockToken);
+ } finally {
+ if (attempt == 0) {
+ readUnlock();
+ } else {
+ writeUnlock();
+ }
+ }
}
- return getBlockLocationsInternal(inode, offset, length, needBlockToken);
+ return null; // can never reach here
}
- synchronized LocatedBlocks getBlockLocationsInternal(INodeFile inode,
+ LocatedBlocks getBlockLocationsInternal(INodeFile inode,
long offset, long length, boolean needBlockToken)
throws IOException {
+ readLock();
+ try {
final BlockInfo[] blocks = inode.getBlocks();
if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@@ -801,33 +877,27 @@ public class FSNamesystem implements FSC
} else {
final long n = inode.computeFileSize(false);
final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
- blocks, offset, length, Integer.MAX_VALUE, needBlockToken);
+ blocks, offset, length, Integer.MAX_VALUE);
final BlockInfo last = inode.getLastBlock();
if (LOG.isDebugEnabled()) {
LOG.debug("last = " + last);
}
+
+ if(isBlockTokenEnabled && needBlockToken) {
+ setBlockTokens(locatedblocks);
+ }
+
if (last.isComplete()) {
return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
- blockManager.getBlockLocation(last, n-last.getNumBytes(), needBlockToken), true);
+ blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
} else {
return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
- blockManager.getBlockLocation(last, n, needBlockToken), false);
+ blockManager.getBlockLocation(last, n), false);
}
}
- }
-
- /** Create a LocatedBlock.
- * @param needBlockToken */
- LocatedBlock createLocatedBlock(final Block b,
- final DatanodeInfo[] locations, final long offset, final boolean corrupt,
- boolean needBlockToken) throws IOException {
- final ExtendedBlock blk = getExtendedBlock(b);
- final LocatedBlock lb = new LocatedBlock(blk, locations, offset, corrupt);
- if (isBlockTokenEnabled && needBlockToken) {
- lb.setBlockToken(blockTokenSecretManager.generateToken(blk,
- EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
+ } finally {
+ readUnlock();
}
- return lb;
}
/** Create a LocatedBlock. */
@@ -836,6 +906,17 @@ public class FSNamesystem implements FSC
return new LocatedBlock(getExtendedBlock(b), locations, offset, corrupt);
}
+ /** Generate block tokens for the blocks to be returned. */
+ private void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException {
+ for(LocatedBlock l : locatedBlocks) {
+ Token<BlockTokenIdentifier> token =
+ blockTokenSecretManager.generateToken(l.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
+
+ l.setBlockToken(token);
+ }
+ }
+
/**
* Moves all the blocks from srcs and appends them to trg
* To avoid rollbacks we will verify validitity of ALL of the args
@@ -874,7 +955,8 @@ public class FSNamesystem implements FSC
}
}
- synchronized(this) {
+ writeLock();
+ try {
// write permission for the target
if (isPermissionEnabled) {
checkPathAccess(target, FsAction.WRITE);
@@ -969,6 +1051,8 @@ public class FSNamesystem implements FSC
}
dir.concatInternal(target,srcs);
+ } finally {
+ writeUnlock();
}
getEditLog().logSync();
@@ -987,12 +1071,14 @@ public class FSNamesystem implements FSC
* The access time is precise upto an hour. The transaction, if needed, is
* written to the edits log but is not flushed.
*/
- public synchronized void setTimes(String src, long mtime, long atime)
+ public void setTimes(String src, long mtime, long atime)
throws IOException, UnresolvedLinkException {
if (!isAccessTimeSupported() && atime != -1) {
throw new IOException("Access time for hdfs is not configured. " +
" Please set dfs.support.accessTime configuration parameter.");
}
+ writeLock();
+ try {
//
// The caller needs to have write access to set access & modification times.
if (isPermissionEnabled) {
@@ -1010,18 +1096,26 @@ public class FSNamesystem implements FSC
} else {
throw new FileNotFoundException("File " + src + " does not exist.");
}
+ } finally {
+ writeUnlock();
+ }
}
/**
* Create a symbolic link.
*/
- public synchronized void createSymlink(String target, String link,
+ public void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
+ writeLock();
+ try {
if (!createParent) {
verifyParentDir(link);
}
createSymlinkInternal(target, link, dirPerms, createParent);
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(link, false);
@@ -1034,9 +1128,11 @@ public class FSNamesystem implements FSC
/**
* Create a symbolic link.
*/
- private synchronized void createSymlinkInternal(String target, String link,
+ private void createSymlinkInternal(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
+ writeLock();
+ try {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" +
target + " link=" + link);
@@ -1060,6 +1156,9 @@ public class FSNamesystem implements FSC
// add symbolic link to namespace
dir.addSymlink(link, target, dirPerms, createParent);
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -1087,9 +1186,11 @@ public class FSNamesystem implements FSC
return status;
}
- private synchronized boolean setReplicationInternal(String src,
+ private boolean setReplicationInternal(String src,
short replication) throws AccessControlException, QuotaExceededException,
SafeModeException, UnresolvedLinkException, IOException {
+ writeLock();
+ try {
if (isInSafeMode())
throw new SafeModeException("Cannot set replication for " + src, safeMode);
blockManager.verifyReplication(src, replication, null);
@@ -1121,6 +1222,9 @@ public class FSNamesystem implements FSC
+ ". New replication is " + replication);
}
return true;
+ } finally {
+ writeUnlock();
+ }
}
long getPreferredBlockSize(String filename)
@@ -1173,14 +1277,29 @@ public class FSNamesystem implements FSC
}
/**
- * For description of exceptions @see {@link ClientProtocol#create()}
+ * Create new or open an existing file for append.<p>
+ *
+ * In case of opening the file for append, the method returns the last
+ * block of the file if this is a partial block, which can still be used
+ * for writing more data. The client uses the returned block locations
+ * to form the data pipeline for this block.<br>
+ * The method returns null if the last block is full or if this is a
+ * new file. The client then allocates a new block with the next call
+ * using {@link NameNode#addBlock()}.<p>
+ *
+ * For description of parameters and exceptions thrown see
+ * {@link ClientProtocol#create()}
+ *
+ * @return the last block locations if the block is partial or null otherwise
*/
- private synchronized void startFileInternal(String src,
+ private LocatedBlock startFileInternal(String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize) throws SafeModeException, FileAlreadyExistsException,
AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
+ writeLock();
+ try {
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
boolean append = flag.contains(CreateFlag.APPEND);
boolean create = flag.contains(CreateFlag.CREATE);
@@ -1290,9 +1409,8 @@ public class FSNamesystem implements FSC
+ src + " on client " + clientMachine);
else {
//append & create a nonexist file equals to overwrite
- this.startFileInternal(src, permissions, holder, clientMachine,
+ return startFileInternal(src, permissions, holder, clientMachine,
EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
- return;
}
} else if (myFile.isDirectory()) {
throw new IOException("failed to append to directory " + src
@@ -1330,6 +1448,15 @@ public class FSNamesystem implements FSC
dir.replaceNode(src, node, cons);
leaseManager.addLease(cons.getClientName(), src);
+ // convert last block to under-construction
+ LocatedBlock lb =
+ blockManager.convertLastBlockToUnderConstruction(cons);
+
+ if (lb != null && isBlockTokenEnabled) {
+ lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
+ }
+ return lb;
} else {
// Now we can add the name to the filesystem. This file has no
// blocks associated with it.
@@ -1355,6 +1482,10 @@ public class FSNamesystem implements FSC
+ie.getMessage());
throw ie;
}
+ } finally {
+ writeUnlock();
+ }
+ return null;
}
/**
@@ -1368,52 +1499,12 @@ public class FSNamesystem implements FSC
throw new UnsupportedOperationException("Append to hdfs not supported." +
" Please refer to dfs.support.append configuration parameter.");
}
- startFileInternal(src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND),
- false, (short)blockManager.maxReplication, (long)0);
+ LocatedBlock lb =
+ startFileInternal(src, null, holder, clientMachine,
+ EnumSet.of(CreateFlag.APPEND),
+ false, (short)blockManager.maxReplication, (long)0);
getEditLog().logSync();
- //
- // Create a LocatedBlock object for the last block of the file
- // to be returned to the client. Return null if the file does not
- // have a partial block at the end.
- //
- LocatedBlock lb = null;
- synchronized (this) {
- INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
- BlockInfo lastBlock = file.getLastBlock();
- if (lastBlock != null) {
- assert lastBlock == blockManager.getStoredBlock(lastBlock) :
- "last block of the file is not in blocksMap";
- if (file.getPreferredBlockSize() > lastBlock.getNumBytes()) {
- long fileLength = file.computeContentSummary().getLength();
- DatanodeDescriptor[] targets = blockManager.getNodes(lastBlock);
- // remove the replica locations of this block from the node
- for (int i = 0; i < targets.length; i++) {
- targets[i].removeBlock(lastBlock);
- }
- // convert last block to under-construction and set its locations
- blockManager.convertLastBlockToUnderConstruction(file, targets);
-
- lb = new LocatedBlock(getExtendedBlock(lastBlock), targets,
- fileLength-lastBlock.getNumBytes());
- if (isBlockTokenEnabled) {
- lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(),
- EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
- }
-
- // Remove block from replication queue.
- blockManager.updateNeededReplications(lastBlock, 0, 0);
-
- // remove this block from the list of pending blocks to be deleted.
- // This reduces the possibility of triggering HADOOP-1349.
- //
- for (DatanodeDescriptor dd : targets) {
- String datanodeId = dd.getStorageID();
- blockManager.removeFromInvalidates(datanodeId, lastBlock);
- }
- }
- }
- }
if (lb != null) {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
@@ -1466,7 +1557,8 @@ public class FSNamesystem implements FSC
+src+" for "+clientName);
}
- synchronized (this) {
+ writeLock();
+ try {
if (isInSafeMode()) {
throw new SafeModeException("Cannot add block to " + src, safeMode);
}
@@ -1490,6 +1582,8 @@ public class FSNamesystem implements FSC
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication = (int)pendingFile.getReplication();
+ } finally {
+ writeUnlock();
}
// choose targets for the new block to be allocated.
@@ -1502,7 +1596,8 @@ public class FSNamesystem implements FSC
}
// Allocate a new block and record it in the INode.
- synchronized (this) {
+ writeLock();
+ try {
INode[] pathINodes = dir.getExistingPathINodes(src);
int inodesLen = pathINodes.length;
checkLease(src, clientName, pathINodes[inodesLen-1]);
@@ -1519,6 +1614,8 @@ public class FSNamesystem implements FSC
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
}
+ } finally {
+ writeUnlock();
}
// Create next block
@@ -1533,10 +1630,11 @@ public class FSNamesystem implements FSC
/**
* The client would like to let go of the given block
*/
- public synchronized boolean abandonBlock(ExtendedBlock b, String src, String holder)
+ public boolean abandonBlock(ExtendedBlock b, String src, String holder)
throws LeaseExpiredException, FileNotFoundException,
UnresolvedLinkException, IOException {
- checkBlock(b);
+ writeLock();
+ try {
//
// Remove the block from the pending creates list
//
@@ -1552,6 +1650,9 @@ public class FSNamesystem implements FSC
+ " is removed from pendingCreates");
}
return true;
+ } finally {
+ writeUnlock();
+ }
}
// make sure that we still have the lease on this file.
@@ -1603,9 +1704,11 @@ public class FSNamesystem implements FSC
return success ;
}
- private synchronized boolean completeFileInternal(String src,
+ private boolean completeFileInternal(String src,
String holder, Block last) throws SafeModeException,
UnresolvedLinkException, IOException {
+ writeLock();
+ try {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
src + " for " + holder);
@@ -1626,6 +1729,9 @@ public class FSNamesystem implements FSC
NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
+ " is closed by " + holder);
return true;
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -1670,7 +1776,9 @@ public class FSNamesystem implements FSC
* replicated. If not, return false. If checkall is true, then check
* all blocks, otherwise check only penultimate block.
*/
- synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
+ boolean checkFileProgress(INodeFile v, boolean checkall) {
+ writeLock();
+ try {
if (checkall) {
//
// check all blocks of the file.
@@ -1696,6 +1804,9 @@ public class FSNamesystem implements FSC
}
}
return true;
+ } finally {
+ writeUnlock();
+ }
}
@@ -1704,10 +1815,14 @@ public class FSNamesystem implements FSC
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
*/
- public synchronized void markBlockAsCorrupt(ExtendedBlock blk, DatanodeInfo dn)
+ public void markBlockAsCorrupt(ExtendedBlock blk, DatanodeInfo dn)
throws IOException {
- checkBlock(blk);
- blockManager.findAndMarkBlockAsCorrupt(ExtendedBlock.getLocalBlock(blk), dn);
+ writeLock();
+ try {
+ blockManager.findAndMarkBlockAsCorrupt(blk.getLocalBlock(), dn);
+ } finally {
+ writeUnlock();
+ }
}
@@ -1742,8 +1857,11 @@ public class FSNamesystem implements FSC
/** @deprecated See {@link #renameTo(String, String)} */
@Deprecated
- private synchronized boolean renameToInternal(String src, String dst)
+ private boolean renameToInternal(String src, String dst)
throws IOException, UnresolvedLinkException {
+
+ writeLock();
+ try {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
" to " + dst);
@@ -1769,6 +1887,9 @@ public class FSNamesystem implements FSC
return true;
}
return false;
+ } finally {
+ writeUnlock();
+ }
}
@@ -1788,8 +1909,10 @@ public class FSNamesystem implements FSC
}
}
- private synchronized void renameToInternal(String src, String dst,
+ private void renameToInternal(String src, String dst,
Options.Rename... options) throws IOException {
+ writeLock();
+ try {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ src + " to " + dst);
@@ -1808,6 +1931,9 @@ public class FSNamesystem implements FSC
HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
dir.renameTo(src, dst, options);
changeLease(src, dst, dinfo); // update lease with new filename
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -1850,7 +1976,9 @@ public class FSNamesystem implements FSC
UnresolvedLinkException, IOException{
boolean deleteNow = false;
ArrayList<Block> collectedBlocks = new ArrayList<Block>();
- synchronized(this) {
+
+ writeLock();
+ try {
if (isInSafeMode()) {
throw new SafeModeException("Cannot delete " + src, safeMode);
}
@@ -1865,6 +1993,8 @@ public class FSNamesystem implements FSC
if (deleteNow) { // Perform small deletes right away
removeBlocks(collectedBlocks);
}
+ } finally {
+ writeUnlock();
}
// Log directory deletion to editlog
getEditLog().logSync();
@@ -1886,10 +2016,13 @@ public class FSNamesystem implements FSC
while (start < blocks.size()) {
end = BLOCK_DELETION_INCREMENT + start;
end = end > blocks.size() ? blocks.size() : end;
- synchronized(this) {
+ writeLock();
+ try {
for (int i=start; i<end; i++) {
blockManager.removeBlock(blocks.get(i));
}
+ } finally {
+ writeUnlock();
}
start = end;
}
@@ -1946,9 +2079,11 @@ public class FSNamesystem implements FSC
/**
* Create all the necessary directories
*/
- private synchronized boolean mkdirsInternal(String src,
+ private boolean mkdirsInternal(String src,
PermissionStatus permissions, boolean createParent)
throws IOException, UnresolvedLinkException {
+ writeLock();
+ try {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
}
@@ -1982,6 +2117,9 @@ public class FSNamesystem implements FSC
throw new IOException("Failed to create directory: " + src);
}
return true;
+ } finally {
+ writeUnlock();
+ }
}
ContentSummary getContentSummary(String src) throws AccessControlException,
@@ -2018,12 +2156,15 @@ public class FSNamesystem implements FSC
NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
+ src + " for " + clientName);
- synchronized (this) {
+ writeLock();
+ try {
if (isInSafeMode()) {
throw new SafeModeException("Cannot fsync file " + src, safeMode);
}
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
dir.persistBlocks(src, pendingFile);
+ } finally {
+ writeUnlock();
}
}
@@ -2190,10 +2331,13 @@ public class FSNamesystem implements FSC
checkReplicationFactor(newFile);
}
- synchronized void commitBlockSynchronization(ExtendedBlock lastblock,
+ void commitBlockSynchronization(ExtendedBlock lastblock,
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
throws IOException, UnresolvedLinkException {
+ String src = "";
+ writeLock();
+ try {
LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+ ", newgenerationstamp=" + newgenerationstamp
+ ", newlength=" + newlength
@@ -2256,7 +2400,7 @@ public class FSNamesystem implements FSC
// If this commit does not want to close the file, persist
// blocks only if append is supported and return
- String src = leaseManager.findPath(pendingFile);
+ src = leaseManager.findPath(pendingFile);
if (!closeFile) {
if (supportAppends) {
dir.persistBlocks(src, pendingFile);
@@ -2271,6 +2415,9 @@ public class FSNamesystem implements FSC
//remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile);
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
LOG.info("commitBlockSynchronization(newblock=" + lastblock
+ ", file=" + src
@@ -2348,8 +2495,10 @@ public class FSNamesystem implements FSC
*
* @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
*/
- public synchronized void registerDatanode(DatanodeRegistration nodeReg
+ public void registerDatanode(DatanodeRegistration nodeReg
) throws IOException {
+ writeLock();
+ try {
String dnAddress = Server.getRemoteAddress();
if (dnAddress == null) {
// Mostly called inside an RPC.
@@ -2463,6 +2612,9 @@ public class FSNamesystem implements FSC
// because its is done when the descriptor is created
}
return;
+ } finally {
+ writeUnlock();
+ }
}
/* Resolve a node's network location */
@@ -2721,11 +2873,13 @@ public class FSNamesystem implements FSC
workFound = blockManager.computeReplicationWork(blocksToProcess);
// Update FSNamesystemMetrics counters
- synchronized (this) {
+ writeLock();
+ try {
blockManager.updateState();
blockManager.scheduledReplicationBlocksCount = workFound;
+ } finally {
+ writeUnlock();
}
-
workFound += blockManager.computeInvalidateWork(nodesToProcess);
return workFound;
}
@@ -2738,8 +2892,10 @@ public class FSNamesystem implements FSC
* remove a datanode descriptor
* @param nodeID datanode ID
*/
- synchronized public void removeDatanode(DatanodeID nodeID)
+ public void removeDatanode(DatanodeID nodeID)
throws IOException {
+ writeLock();
+ try {
DatanodeDescriptor nodeInfo = getDatanode(nodeID);
if (nodeInfo != null) {
removeDatanode(nodeInfo);
@@ -2747,6 +2903,9 @@ public class FSNamesystem implements FSC
NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
+ nodeID.getName() + " does not exist");
}
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -2849,7 +3008,8 @@ public class FSNamesystem implements FSC
// acquire the fsnamesystem lock, and then remove the dead node.
if (foundDead) {
- synchronized (this) {
+ writeLock();
+ try {
synchronized(heartbeats) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeInfo = null;
@@ -2865,6 +3025,8 @@ public class FSNamesystem implements FSC
}
}
}
+ } finally {
+ writeUnlock();
}
}
allAlive = !foundDead;
@@ -2875,9 +3037,11 @@ public class FSNamesystem implements FSC
* The given node is reporting all its blocks. Use this info to
* update the (machine-->blocklist) and (block-->machinelist) tables.
*/
- public synchronized void processReport(DatanodeID nodeID, String poolId,
+ public void processReport(DatanodeID nodeID, String poolId,
BlockListAsLongs newReport) throws IOException {
- checkPoolId(poolId);
+
+ writeLock();
+ try {
long startTime = now();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
@@ -2898,6 +3062,9 @@ public class FSNamesystem implements FSC
blockManager.processReport(node, newReport);
NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -3005,12 +3172,13 @@ public class FSNamesystem implements FSC
/**
* The given node is reporting that it received a certain block.
*/
- public synchronized void blockReceived(DatanodeID nodeID,
+ public void blockReceived(DatanodeID nodeID,
String poolId,
Block block,
String delHint
) throws IOException {
- checkPoolId(poolId);
+ writeLock();
+ try {
DatanodeDescriptor node = getDatanode(nodeID);
if (node == null || !node.isAlive) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
@@ -3031,6 +3199,9 @@ public class FSNamesystem implements FSC
}
blockManager.addBlock(node, block, delHint);
+ } finally {
+ writeUnlock();
+ }
}
private void checkPoolId(String thatPoolId) throws IOException {
@@ -3130,16 +3301,17 @@ public class FSNamesystem implements FSC
return getDatanodeListForReport(type).size();
}
- private synchronized ArrayList<DatanodeDescriptor> getDatanodeListForReport(
- DatanodeReportType type) {
-
+ private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
+ DatanodeReportType type) {
boolean listLiveNodes = type == DatanodeReportType.ALL ||
type == DatanodeReportType.LIVE;
boolean listDeadNodes = type == DatanodeReportType.ALL ||
type == DatanodeReportType.DEAD;
HashMap<String, String> mustList = new HashMap<String, String>();
-
+
+ readLock();
+ try {
if (listDeadNodes) {
//first load all the nodes listed in include and exclude files.
for (Iterator<String> it = hostsReader.getHosts().iterator();
@@ -3182,10 +3354,15 @@ public class FSNamesystem implements FSC
}
return nodes;
+ } finally {
+ readUnlock();
+ }
}
- public synchronized DatanodeInfo[] datanodeReport( DatanodeReportType type
+ public DatanodeInfo[] datanodeReport( DatanodeReportType type
) throws AccessControlException {
+ readLock();
+ try {
checkSuperuserPrivilege();
ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
@@ -3194,6 +3371,9 @@ public class FSNamesystem implements FSC
arr[i] = new DatanodeInfo(results.get(i));
}
return arr;
+ } finally {
+ readUnlock();
+ }
}
/**
@@ -3204,7 +3384,9 @@ public class FSNamesystem implements FSC
* @throws AccessControlException if superuser privilege is violated.
* @throws IOException if
*/
- synchronized void saveNamespace() throws AccessControlException, IOException {
+ void saveNamespace() throws AccessControlException, IOException {
+ writeLock();
+ try {
checkSuperuserPrivilege();
if(!isInSafeMode()) {
throw new IOException("Safe mode should be turned ON " +
@@ -3212,6 +3394,9 @@ public class FSNamesystem implements FSC
}
getFSImage().saveNamespace(true);
LOG.info("New namespace image has been created.");
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -3220,7 +3405,9 @@ public class FSNamesystem implements FSC
*
* @throws AccessControlException if superuser privilege is violated.
*/
- synchronized boolean restoreFailedStorage(String arg) throws AccessControlException {
+ boolean restoreFailedStorage(String arg) throws AccessControlException {
+ writeLock();
+ try {
checkSuperuserPrivilege();
// if it is disabled - enable it and vice versa.
@@ -3231,13 +3418,17 @@ public class FSNamesystem implements FSC
getFSImage().setRestoreFailedStorage(val);
return val;
+ } finally {
+ writeUnlock();
+ }
}
/**
*/
- public synchronized void DFSNodesStatus(ArrayList<DatanodeDescriptor> live,
+ public void DFSNodesStatus(ArrayList<DatanodeDescriptor> live,
ArrayList<DatanodeDescriptor> dead) {
-
+ readLock();
+ try {
ArrayList<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.ALL);
for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
@@ -3247,12 +3438,17 @@ public class FSNamesystem implements FSC
else
live.add(node);
}
+ } finally {
+ readUnlock();
+ }
}
/**
* Prints information about all datanodes.
*/
- private synchronized void datanodeDump(PrintWriter out) {
+ private void datanodeDump(PrintWriter out) {
+ readLock();
+ try {
synchronized (datanodeMap) {
out.println("Metasave: Number of datanodes: " + datanodeMap.size());
for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
@@ -3260,6 +3456,9 @@ public class FSNamesystem implements FSC
out.println(node.dumpDatanode());
}
}
+ } finally {
+ readUnlock();
+ }
}
/**
@@ -3402,7 +3601,8 @@ public class FSNamesystem implements FSC
hostsReader.updateFileNames(conf.get("dfs.hosts",""),
conf.get("dfs.hosts.exclude", ""));
hostsReader.refresh();
- synchronized (this) {
+ writeLock();
+ try {
for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
it.hasNext();) {
DatanodeDescriptor node = it.next();
@@ -3423,8 +3623,9 @@ public class FSNamesystem implements FSC
}
}
}
- }
-
+ } finally {
+ writeUnlock();
+ }
}
void finalizeUpgrade() throws IOException {
@@ -3440,8 +3641,9 @@ public class FSNamesystem implements FSC
* Returns TRUE if node is registered (including when it is on the
* exclude list and is being decommissioned).
*/
- private synchronized boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr)
+ private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr)
throws IOException {
+ assert (hasWriteLock());
if (!inHostsList(nodeReg, ipAddr)) {
return false;
}
@@ -3939,7 +4141,9 @@ public class FSNamesystem implements FSC
* Enter safe mode manually.
* @throws IOException
*/
- synchronized void enterSafeMode() throws IOException {
+ void enterSafeMode() throws IOException {
+ writeLock();
+ try {
// Ensure that any concurrent operations have been fully synced
// before entering safe mode. This ensures that the FSImage
// is entirely stable on disk as soon as we're in safe mode.
@@ -3951,13 +4155,18 @@ public class FSNamesystem implements FSC
safeMode.setManual();
NameNode.stateChangeLog.info("STATE* Safe mode is ON. "
+ safeMode.getTurnOffTip());
+ } finally {
+ writeUnlock();
+ }
}
/**
* Leave safe mode.
* @throws IOException
*/
- synchronized void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
+ void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
+ writeLock();
+ try {
if (!isInSafeMode()) {
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
return;
@@ -3966,50 +4175,78 @@ public class FSNamesystem implements FSC
throw new SafeModeException("Distributed upgrade is in progress",
safeMode);
safeMode.leave(checkForUpgrades);
+ } finally {
+ writeUnlock();
+ }
}
- synchronized String getSafeModeTip() {
+ String getSafeModeTip() {
+ readLock();
+ try {
if (!isInSafeMode())
return "";
return safeMode.getTurnOffTip();
+ } finally {
+ readUnlock();
+ }
}
long getEditLogSize() throws IOException {
return getEditLog().getEditLogSize();
}
- synchronized CheckpointSignature rollEditLog() throws IOException {
+ CheckpointSignature rollEditLog() throws IOException {
+ writeLock();
+ try {
if (isInSafeMode()) {
throw new SafeModeException("Checkpoint not created",
safeMode);
}
LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
return getFSImage().rollEditLog();
+ } finally {
+ writeUnlock();
+ }
}
- synchronized void rollFSImage() throws IOException {
+ void rollFSImage() throws IOException {
+ writeLock();
+ try {
if (isInSafeMode()) {
throw new SafeModeException("Checkpoint not created",
safeMode);
}
LOG.info("Roll FSImage from " + Server.getRemoteAddress());
getFSImage().rollFSImage();
+ } finally {
+ writeUnlock();
+ }
}
- synchronized NamenodeCommand startCheckpoint(
+ NamenodeCommand startCheckpoint(
NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node
throws IOException {
+ writeLock();
+ try {
LOG.info("Start checkpoint for " + bnReg.getAddress());
NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
getEditLog().logSync();
return cmd;
+ } finally {
+ writeUnlock();
+ }
}
- synchronized void endCheckpoint(NamenodeRegistration registration,
+ void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
+ writeLock();
+ try {
LOG.info("End checkpoint for " + registration.getAddress());
getFSImage().endCheckpoint(sig, registration.getRole());
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -4098,8 +4335,13 @@ public class FSNamesystem implements FSC
fsOwner.getShortUserName(), supergroup);
if (!pc.isSuper) {
dir.waitForReady();
- pc.checkPermission(path, dir.rootDir, doCheckOwner,
- ancestorAccess, parentAccess, access, subAccess);
+ readLock();
+ try {
+ pc.checkPermission(path, dir.rootDir, doCheckOwner,
+ ancestorAccess, parentAccess, access, subAccess);
+ } finally {
+ readUnlock();
+ }
}
return pc;
}
@@ -4301,9 +4543,10 @@ public class FSNamesystem implements FSC
* @return a located block with a new generation stamp and an access token
* @throws IOException if any error occurs
*/
- synchronized LocatedBlock updateBlockForPipeline(ExtendedBlock block,
+ LocatedBlock updateBlockForPipeline(ExtendedBlock block,
String clientName) throws IOException {
- checkBlock(block);
+ writeLock();
+ try {
// check vadility of parameters
checkUCBlock(block, clientName);
@@ -4315,6 +4558,9 @@ public class FSNamesystem implements FSC
block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
return locatedBlock;
+ } finally {
+ writeUnlock();
+ }
}
@@ -4327,12 +4573,11 @@ public class FSNamesystem implements FSC
* @param newNodes datanodes in the pipeline
* @throws IOException if any error occurs
*/
- synchronized void updatePipeline(String clientName, ExtendedBlock oldBlock,
+ void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException {
- checkBlock(oldBlock);
- checkBlock(newBlock);
-
+ writeLock();
+ try {
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
LOG.info("updatePipeline(block=" + oldBlock
@@ -4379,6 +4624,9 @@ public class FSNamesystem implements FSC
}
LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
return;
+ } finally {
+ writeUnlock();
+ }
}
// rename was successful. If any part of the renamed subtree had
@@ -4447,8 +4695,10 @@ public class FSNamesystem implements FSC
* @param registration
* @throws IOException
*/
- synchronized void registerBackupNode(NamenodeRegistration registration)
+ void registerBackupNode(NamenodeRegistration registration)
throws IOException {
+ writeLock();
+ try {
if(getFSImage().getNamespaceID() != registration.getNamespaceID())
throw new IOException("Incompatible namespaceIDs: "
+ " Namenode namespaceID = " + getFSImage().getNamespaceID()
@@ -4458,6 +4708,9 @@ public class FSNamesystem implements FSC
if(!regAllowed)
throw new IOException("Registration is not allowed. " +
"Another node is registered as a backup.");
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -4467,14 +4720,19 @@ public class FSNamesystem implements FSC
* @param registration
* @throws IOException
*/
- synchronized void releaseBackupNode(NamenodeRegistration registration)
+ void releaseBackupNode(NamenodeRegistration registration)
throws IOException {
+ writeLock();
+ try {
if(getFSImage().getNamespaceID() != registration.getNamespaceID())
throw new IOException("Incompatible namespaceIDs: "
+ " Namenode namespaceID = " + getFSImage().getNamespaceID()
+ "; " + registration.getRole() +
" node namespaceID = " + registration.getNamespaceID());
getEditLog().releaseBackupStream(registration);
+ } finally {
+ writeUnlock();
+ }
}
public int numCorruptReplicas(Block blk) {
@@ -4527,9 +4785,11 @@ public class FSNamesystem implements FSC
* @throws AccessControlException
* @throws IOException
*/
- synchronized Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
+ Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
String startBlockAfter) throws AccessControlException, IOException {
+ readLock();
+ try {
checkSuperuserPrivilege();
long startBlockId = 0;
// print a limited # of corrupt files per call
@@ -4556,9 +4816,14 @@ public class FSNamesystem implements FSC
}
LOG.info("list corrupt file blocks returned: " + count);
return corruptFiles;
+ } finally {
+ readUnlock();
+ }
}
- public synchronized ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
+ public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
+ readLock();
+ try {
ArrayList<DatanodeDescriptor> decommissioningNodes =
new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> results =
@@ -4570,6 +4835,9 @@ public class FSNamesystem implements FSC
}
}
return decommissioningNodes;
+ } finally {
+ readUnlock();
+ }
}
/*
@@ -4698,8 +4966,11 @@ public class FSNamesystem implements FSC
*/
private void logGetDelegationToken(DelegationTokenIdentifier id,
long expiryTime) throws IOException {
- synchronized (this) {
+ writeLock();
+ try {
getEditLog().logGetDelegationToken(id, expiryTime);
+ } finally {
+ writeUnlock();
}
getEditLog().logSync();
}
@@ -4712,8 +4983,11 @@ public class FSNamesystem implements FSC
*/
private void logRenewDelegationToken(DelegationTokenIdentifier id,
long expiryTime) throws IOException {
- synchronized (this) {
+ writeLock();
+ try {
getEditLog().logRenewDelegationToken(id, expiryTime);
+ } finally {
+ writeUnlock();
}
getEditLog().logSync();
}
@@ -4726,8 +5000,11 @@ public class FSNamesystem implements FSC
*/
private void logCancelDelegationToken(DelegationTokenIdentifier id)
throws IOException {
- synchronized (this) {
+ writeLock();
+ try {
getEditLog().logCancelDelegationToken(id);
+ } finally {
+ writeUnlock();
}
getEditLog().logSync();
}
@@ -4738,8 +5015,11 @@ public class FSNamesystem implements FSC
* @param key new delegation key.
*/
public void logUpdateMasterKey(DelegationKey key) throws IOException {
- synchronized (this) {
+ writeLock();
+ try {
getEditLog().logUpdateMasterKey(key);
+ } finally {
+ writeUnlock();
}
getEditLog().logSync();
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Fri Sep 17 19:51:27 2010
@@ -120,7 +120,6 @@ class FSPermissionChecker {
+ ", subAccess=" + subAccess);
}
// check if (parentAccess != null) && file exists, then check sb
- synchronized(root) {
// Resolve symlinks, the check is performed on the link target.
INode[] inodes = root.getExistingPathINodes(path, true);
int ancestorIndex = inodes.length - 2;
@@ -147,7 +146,6 @@ class FSPermissionChecker {
if (doCheckOwner) {
checkOwner(inodes[inodes.length - 1]);
}
- }
}
private void checkOwner(INode inode) throws AccessControlException {
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Fri Sep 17 19:51:27 2010
@@ -120,9 +120,9 @@ public class FileDataServlet extends Dfs
response.getWriter().println(e.toString());
}
} else if (info == null) {
- response.sendError(400, "cat: File not found " + path);
+ response.sendError(400, "File not found " + path);
} else {
- response.sendError(400, "cat: " + path + ": is a directory");
+ response.sendError(400, path + ": is a directory");
}
return null;
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Fri Sep 17 19:51:27 2010
@@ -366,12 +366,16 @@ public class LeaseManager {
/** Check leases periodically. */
public void run() {
for(; fsnamesystem.isRunning(); ) {
- synchronized(fsnamesystem) {
+ fsnamesystem.writeLock();
+ try {
if (!fsnamesystem.isInSafeMode()) {
checkLeases();
}
+ } finally {
+ fsnamesystem.writeUnlock();
}
+
try {
Thread.sleep(2000);
} catch(InterruptedException ie) {
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java Fri Sep 17 19:51:27 2010
@@ -131,9 +131,11 @@ public class ListPathsServlet extends Df
throws ServletException, IOException {
final PrintWriter out = response.getWriter();
final XMLOutputter doc = new XMLOutputter(out, "UTF-8");
+
+ final Map<String, String> root = buildRoot(request, doc);
+ final String path = root.get("path");
+
try {
- final Map<String, String> root = buildRoot(request, doc);
- final String path = root.get("path");
final boolean recur = "yes".equals(root.get("recursive"));
final Pattern filter = Pattern.compile(root.get("filter"));
final Pattern exclude = Pattern.compile(root.get("exclude"));
@@ -191,14 +193,18 @@ public class ListPathsServlet extends Df
writeXml(re, p, doc);
}
}
- doc.endDocument();
return null;
}
});
+ } catch(IOException ioe) {
+ writeXml(ioe, path, doc);
} catch (InterruptedException e) {
LOG.warn("ListPathServlet encountered InterruptedException", e);
response.sendError(400, e.getMessage());
} finally {
+ if (doc != null) {
+ doc.endDocument();
+ }
if (out != null) {
out.close();
}
Propchange: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/test/hdfs:776175-785643
/hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/test/hdfs:820487
-/hadoop/hdfs/trunk/src/test/hdfs:987665-997035
+/hadoop/hdfs/trunk/src/test/hdfs:987665-998256
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Sep 17 19:51:27 2010
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.FileChannel;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Random;
@@ -56,6 +57,7 @@ import org.apache.hadoop.net.DNSToSwitch
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.util.StringUtils;
@@ -879,6 +881,22 @@ public class MiniDFSCluster {
}
/**
+ * @return a {@link HftpFileSystem} object as specified user.
+ */
+ public HftpFileSystem getHftpFileSystemAs(final String username,
+ final Configuration conf, final String... groups
+ ) throws IOException, InterruptedException {
+ final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
+ username, groups);
+ return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() {
+ @Override
+ public HftpFileSystem run() throws Exception {
+ return getHftpFileSystem();
+ }
+ });
+ }
+
+ /**
* Get the directories where the namenode stores its image.
*/
public Collection<URI> getNameDirs() {
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Sep 17 19:51:27 2010
@@ -18,7 +18,10 @@
package org.apache.hadoop.hdfs;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -179,90 +182,6 @@ public class TestDistributedFileSystem {
}
@Test
- public void testFileChecksum() throws IOException {
- ((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL);
-
- final long seed = RAN.nextLong();
- System.out.println("seed=" + seed);
- RAN.setSeed(seed);
-
- final Configuration conf = getTestConfiguration();
- conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
-
- final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
- final FileSystem hdfs = cluster.getFileSystem();
- final String hftpuri = "hftp://" + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
- System.out.println("hftpuri=" + hftpuri);
- final FileSystem hftp = new Path(hftpuri).getFileSystem(conf);
-
- final String dir = "/filechecksum";
- final int block_size = 1024;
- final int buffer_size = conf.getInt("io.file.buffer.size", 4096);
- conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
-
- //try different number of blocks
- for(int n = 0; n < 5; n++) {
- //generate random data
- final byte[] data = new byte[RAN.nextInt(block_size/2-1)+n*block_size+1];
- RAN.nextBytes(data);
- System.out.println("data.length=" + data.length);
-
- //write data to a file
- final Path foo = new Path(dir, "foo" + n);
- {
- final FSDataOutputStream out = hdfs.create(foo, false, buffer_size,
- (short)2, block_size);
- out.write(data);
- out.close();
- }
-
- //compute checksum
- final FileChecksum hdfsfoocs = hdfs.getFileChecksum(foo);
- System.out.println("hdfsfoocs=" + hdfsfoocs);
-
- final FileChecksum hftpfoocs = hftp.getFileChecksum(foo);
- System.out.println("hftpfoocs=" + hftpfoocs);
-
- final Path qualified = new Path(hftpuri + dir, "foo" + n);
- final FileChecksum qfoocs = hftp.getFileChecksum(qualified);
- System.out.println("qfoocs=" + qfoocs);
-
- //write another file
- final Path bar = new Path(dir, "bar" + n);
- {
- final FSDataOutputStream out = hdfs.create(bar, false, buffer_size,
- (short)2, block_size);
- out.write(data);
- out.close();
- }
-
- { //verify checksum
- final FileChecksum barcs = hdfs.getFileChecksum(bar);
- final int barhashcode = barcs.hashCode();
- assertEquals(hdfsfoocs.hashCode(), barhashcode);
- assertEquals(hdfsfoocs, barcs);
-
- assertEquals(hftpfoocs.hashCode(), barhashcode);
- assertEquals(hftpfoocs, barcs);
-
- assertEquals(qfoocs.hashCode(), barhashcode);
- assertEquals(qfoocs, barcs);
- }
- }
- cluster.shutdown();
- }
-
- @Test
- public void testAllWithDualPort() throws Exception {
- dualPortTesting = true;
-
- testFileSystemCloseAll();
- testDFSClose();
- testDFSClient();
- testFileChecksum();
- }
-
- @Test
public void testStatistics() throws Exception {
int lsLimit = 2;
final Configuration conf = getTestConfiguration();
@@ -359,4 +278,100 @@ public class TestDistributedFileSystem {
assertEquals(writeOps, DFSTestUtil.getStatistics(fs).getWriteOps());
assertEquals(largeReadOps, DFSTestUtil.getStatistics(fs).getLargeReadOps());
}
+
+ @Test
+ public void testFileChecksum() throws Exception {
+ ((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL);
+
+ final long seed = RAN.nextLong();
+ System.out.println("seed=" + seed);
+ RAN.setSeed(seed);
+
+ final Configuration conf = getTestConfiguration();
+ conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
+
+ final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+ final FileSystem hdfs = cluster.getFileSystem();
+ final String hftpuri = "hftp://" + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+ System.out.println("hftpuri=" + hftpuri);
+ final FileSystem hftp = new Path(hftpuri).getFileSystem(conf);
+
+ final String dir = "/filechecksum";
+ final int block_size = 1024;
+ final int buffer_size = conf.getInt("io.file.buffer.size", 4096);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
+
+ //try different number of blocks
+ for(int n = 0; n < 5; n++) {
+ //generate random data
+ final byte[] data = new byte[RAN.nextInt(block_size/2-1)+n*block_size+1];
+ RAN.nextBytes(data);
+ System.out.println("data.length=" + data.length);
+
+ //write data to a file
+ final Path foo = new Path(dir, "foo" + n);
+ {
+ final FSDataOutputStream out = hdfs.create(foo, false, buffer_size,
+ (short)2, block_size);
+ out.write(data);
+ out.close();
+ }
+
+ //compute checksum
+ final FileChecksum hdfsfoocs = hdfs.getFileChecksum(foo);
+ System.out.println("hdfsfoocs=" + hdfsfoocs);
+
+ final FileChecksum hftpfoocs = hftp.getFileChecksum(foo);
+ System.out.println("hftpfoocs=" + hftpfoocs);
+
+ final Path qualified = new Path(hftpuri + dir, "foo" + n);
+ final FileChecksum qfoocs = hftp.getFileChecksum(qualified);
+ System.out.println("qfoocs=" + qfoocs);
+
+ //write another file
+ final Path bar = new Path(dir, "bar" + n);
+ {
+ final FSDataOutputStream out = hdfs.create(bar, false, buffer_size,
+ (short)2, block_size);
+ out.write(data);
+ out.close();
+ }
+
+ { //verify checksum
+ final FileChecksum barcs = hdfs.getFileChecksum(bar);
+ final int barhashcode = barcs.hashCode();
+ assertEquals(hdfsfoocs.hashCode(), barhashcode);
+ assertEquals(hdfsfoocs, barcs);
+
+ assertEquals(hftpfoocs.hashCode(), barhashcode);
+ assertEquals(hftpfoocs, barcs);
+
+ assertEquals(qfoocs.hashCode(), barhashcode);
+ assertEquals(qfoocs, barcs);
+ }
+
+ { //test permission error on hftp
+ hdfs.setPermission(new Path(dir), new FsPermission((short)0));
+ try {
+ final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
+ final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, "somegroup");
+ hftp2.getFileChecksum(qualified);
+ fail();
+ } catch(IOException ioe) {
+ FileSystem.LOG.info("GOOD: getting an exception", ioe);
+ }
+ }
+ }
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testAllWithDualPort() throws Exception {
+ dualPortTesting = true;
+
+ testFileSystemCloseAll();
+ testDFSClose();
+ testDFSClient();
+ testFileChecksum();
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java Fri Sep 17 19:51:27 2010
@@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hdfs;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Random;
@@ -29,17 +34,17 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
-
-import static org.junit.Assert.*;
-import org.junit.Test;
-import org.junit.BeforeClass;
import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
/**
* This class tests the FileStatus API.
@@ -184,7 +189,7 @@ public class TestFileStatus {
/** Test FileStatus objects obtained from a directory */
@Test
- public void testGetFileStatusOnDir() throws IOException {
+ public void testGetFileStatusOnDir() throws Exception {
// Create the directory
Path dir = new Path("/test/mkdirs");
assertTrue("mkdir failed", fs.mkdirs(dir));
@@ -284,5 +289,17 @@ public class TestFileStatus {
assertEquals(file2.toString(), itor.next().getPath().toString());
assertEquals(file3.toString(), itor.next().getPath().toString());
assertFalse(itor.hasNext());
+
+ { //test permission error on hftp
+ fs.setPermission(dir, new FsPermission((short)0));
+ try {
+ final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
+ final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, conf, "somegroup");
+ hftp2.getContentSummary(dir);
+ fail();
+ } catch(IOException ioe) {
+ FileSystem.LOG.info("GOOD: getting an exception", ioe);
+ }
+ }
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java?rev=998286&r1=998285&r2=998286&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java Fri Sep 17 19:51:27 2010
@@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -100,6 +102,29 @@ public class TestListPathServlet {
// Non existent path
checkStatus("/nonexistent");
checkStatus("/nonexistent/a");
+
+ final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
+ final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, CONF, "somegroup");
+ { //test file not found on hftp
+ final Path nonexistent = new Path("/nonexistent");
+ try {
+ hftp2.getFileStatus(nonexistent);
+ Assert.fail();
+ } catch(IOException ioe) {
+ FileSystem.LOG.info("GOOD: getting an exception", ioe);
+ }
+ }
+
+ { //test permission error on hftp
+ final Path dir = new Path("/dir");
+ fs.setPermission(dir, new FsPermission((short)0));
+ try {
+ hftp2.getFileStatus(new Path(dir, "a"));
+ Assert.fail();
+ } catch(IOException ioe) {
+ FileSystem.LOG.info("GOOD: getting an exception", ioe);
+ }
+ }
}
private void checkStatus(String listdir) throws IOException {
Propchange: hadoop/hdfs/branches/HDFS-1052/src/webapps/datanode/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/webapps/datanode:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/webapps/datanode:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/webapps/datanode:820487
-/hadoop/hdfs/trunk/src/webapps/datanode:987665-997035
+/hadoop/hdfs/trunk/src/webapps/datanode:987665-998256
Propchange: hadoop/hdfs/branches/HDFS-1052/src/webapps/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/webapps/hdfs:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/webapps/hdfs:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs:820487
-/hadoop/hdfs/trunk/src/webapps/hdfs:987665-997035
+/hadoop/hdfs/trunk/src/webapps/hdfs:987665-998256
Propchange: hadoop/hdfs/branches/HDFS-1052/src/webapps/secondary/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 17 19:51:27 2010
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/webapps/secondary:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/webapps/secondary:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/webapps/secondary:820487
-/hadoop/hdfs/trunk/src/webapps/secondary:987665-997035
+/hadoop/hdfs/trunk/src/webapps/secondary:987665-998256