You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/21 22:09:57 UTC
svn commit: r1138160 [4/6] - in /hadoop/common/branches/HDFS-1073/hdfs: ./
bin/ src/c++/libhdfs/ src/c++/libhdfs/tests/ src/contrib/
src/contrib/fuse-dfs/ src/contrib/fuse-dfs/src/ src/contrib/hdfsproxy/
src/docs/src/documentation/content/xdocs/ src/ja...
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jun 21 20:09:54 2011
@@ -17,45 +17,100 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import org.apache.commons.logging.*;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Util;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
-import org.apache.hadoop.util.*;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
+import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -66,51 +121,33 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.DataOutputStream;
-import java.io.PrintWriter;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.URI;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
/***************************************************
* FSNamesystem does the actual bookkeeping work for the
* DataNode.
@@ -271,8 +308,8 @@ public class FSNamesystem implements FSC
private FsServerDefaults serverDefaults;
// allow appending to hdfs files
private boolean supportAppends = true;
- private DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure =
- DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
+ private ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure =
+ ReplaceDatanodeOnFailure.DEFAULT;
private volatile SafeModeInfo safeMode; // safe mode information
private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
@@ -455,6 +492,14 @@ public class FSNamesystem implements FSC
return this.fsLock.isWriteLockedByCurrentThread();
}
+ boolean hasReadLock() {
+ return this.fsLock.getReadHoldCount() > 0;
+ }
+
+ boolean hasReadOrWriteLock() {
+ return hasReadLock() || hasWriteLock();
+ }
+
/**
* dirs is a list of directories where the filesystem directory state
* is stored
@@ -553,7 +598,7 @@ public class FSNamesystem implements FSC
+ " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
- this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+ this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
}
/**
@@ -565,11 +610,14 @@ public class FSNamesystem implements FSC
}
NamespaceInfo getNamespaceInfo() {
- return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
- getClusterId(),
- getBlockPoolId(),
- dir.fsImage.getStorage().getCTime(),
- getDistributedUpgradeVersion());
+ readLock();
+ try {
+ return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
+ getClusterId(), getBlockPoolId(),
+ dir.fsImage.getStorage().getCTime(), getDistributedUpgradeVersion());
+ } finally {
+ readUnlock();
+ }
}
/**
@@ -618,32 +666,32 @@ public class FSNamesystem implements FSC
void metaSave(String filename) throws IOException {
writeLock();
try {
- checkSuperuserPrivilege();
- File file = new File(System.getProperty("hadoop.log.dir"), filename);
- PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
- true)));
-
- long totalInodes = this.dir.totalInodes();
- long totalBlocks = this.getBlocksTotal();
-
- ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
- ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
- this.DFSNodesStatus(live, dead);
-
- String str = totalInodes + " files and directories, " + totalBlocks
- + " blocks = " + (totalInodes + totalBlocks) + " total";
- out.println(str);
- out.println("Live Datanodes: "+live.size());
- out.println("Dead Datanodes: "+dead.size());
- blockManager.metaSave(out);
-
- //
- // Dump all datanodes
- //
- datanodeDump(out);
-
- out.flush();
- out.close();
+ checkSuperuserPrivilege();
+ File file = new File(System.getProperty("hadoop.log.dir"), filename);
+ PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
+ true)));
+
+ long totalInodes = this.dir.totalInodes();
+ long totalBlocks = this.getBlocksTotal();
+
+ ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+ ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+ this.DFSNodesStatus(live, dead);
+
+ String str = totalInodes + " files and directories, " + totalBlocks
+ + " blocks = " + (totalInodes + totalBlocks) + " total";
+ out.println(str);
+ out.println("Live Datanodes: "+live.size());
+ out.println("Dead Datanodes: "+dead.size());
+ blockManager.metaSave(out);
+
+ //
+ // Dump all datanodes
+ //
+ datanodeDump(out);
+
+ out.flush();
+ out.close();
} finally {
writeUnlock();
}
@@ -681,46 +729,46 @@ public class FSNamesystem implements FSC
throws IOException {
readLock();
try {
- checkSuperuserPrivilege();
-
- DatanodeDescriptor node = getDatanode(datanode);
- if (node == null) {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
- + "Asking for blocks from an unrecorded node " + datanode.getName());
- throw new IllegalArgumentException(
- "Unexpected exception. Got getBlocks message for datanode " +
- datanode.getName() + ", but there is no info for it");
- }
-
- int numBlocks = node.numBlocks();
- if(numBlocks == 0) {
- return new BlocksWithLocations(new BlockWithLocations[0]);
- }
- Iterator<BlockInfo> iter = node.getBlockIterator();
- int startBlock = r.nextInt(numBlocks); // starting from a random block
- // skip blocks
- for(int i=0; i<startBlock; i++) {
- iter.next();
- }
- List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
- long totalSize = 0;
- BlockInfo curBlock;
- while(totalSize<size && iter.hasNext()) {
- curBlock = iter.next();
- if(!curBlock.isComplete()) continue;
- totalSize += addBlock(curBlock, results);
- }
- if(totalSize<size) {
- iter = node.getBlockIterator(); // start from the beginning
- for(int i=0; i<startBlock&&totalSize<size; i++) {
+ checkSuperuserPrivilege();
+
+ DatanodeDescriptor node = getDatanode(datanode);
+ if (node == null) {
+ NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+ + "Asking for blocks from an unrecorded node " + datanode.getName());
+ throw new IllegalArgumentException(
+ "Unexpected exception. Got getBlocks message for datanode " +
+ datanode.getName() + ", but there is no info for it");
+ }
+
+ int numBlocks = node.numBlocks();
+ if(numBlocks == 0) {
+ return new BlocksWithLocations(new BlockWithLocations[0]);
+ }
+ Iterator<BlockInfo> iter = node.getBlockIterator();
+ int startBlock = r.nextInt(numBlocks); // starting from a random block
+ // skip blocks
+ for(int i=0; i<startBlock; i++) {
+ iter.next();
+ }
+ List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+ long totalSize = 0;
+ BlockInfo curBlock;
+ while(totalSize<size && iter.hasNext()) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
totalSize += addBlock(curBlock, results);
}
- }
-
- return new BlocksWithLocations(
- results.toArray(new BlockWithLocations[results.size()]));
+ if(totalSize<size) {
+ iter = node.getBlockIterator(); // start from the beginning
+ for(int i=0; i<startBlock&&totalSize<size; i++) {
+ curBlock = iter.next();
+ if(!curBlock.isComplete()) continue;
+ totalSize += addBlock(curBlock, results);
+ }
+ }
+
+ return new BlocksWithLocations(
+ results.toArray(new BlockWithLocations[results.size()]));
} finally {
readUnlock();
}
@@ -741,6 +789,7 @@ public class FSNamesystem implements FSC
* return the length of the added block; 0 if the block is not added
*/
private long addBlock(Block block, List<BlockWithLocations> results) {
+ assert hasReadOrWriteLock();
ArrayList<String> machineSet = blockManager.getValidLocations(block);
if(machineSet.size() == 0) {
return 0;
@@ -763,21 +812,25 @@ public class FSNamesystem implements FSC
public void setPermission(String src, FsPermission permission)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
+ HdfsFileStatus resultingStat = null;
writeLock();
try {
- if (isInSafeMode())
- throw new SafeModeException("Cannot set permission for " + src, safeMode);
- checkOwner(src);
- dir.setPermission(src, permission);
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot set permission for " + src, safeMode);
+ }
+ checkOwner(src);
+ dir.setPermission(src, permission);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(src, false);
+ }
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(src, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
- "setPermission", src, null, stat);
- }
- } finally {
- writeUnlock();
+ "setPermission", src, null, resultingStat);
}
}
@@ -788,30 +841,34 @@ public class FSNamesystem implements FSC
public void setOwner(String src, String username, String group)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
+ HdfsFileStatus resultingStat = null;
writeLock();
try {
- if (isInSafeMode())
+ if (isInSafeMode()) {
throw new SafeModeException("Cannot set owner for " + src, safeMode);
- FSPermissionChecker pc = checkOwner(src);
- if (!pc.isSuper) {
- if (username != null && !pc.user.equals(username)) {
- throw new AccessControlException("Non-super user cannot change owner.");
}
- if (group != null && !pc.containsGroup(group)) {
- throw new AccessControlException("User does not belong to " + group
+ FSPermissionChecker pc = checkOwner(src);
+ if (!pc.isSuper) {
+ if (username != null && !pc.user.equals(username)) {
+ throw new AccessControlException("Non-super user cannot change owner.");
+ }
+ if (group != null && !pc.containsGroup(group)) {
+ throw new AccessControlException("User does not belong to " + group
+ " .");
+ }
+ }
+ dir.setOwner(src, username, group);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(src, false);
}
+ } finally {
+ writeUnlock();
}
- dir.setOwner(src, username, group);
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(src, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
- "setOwner", src, null, stat);
- }
- } finally {
- writeUnlock();
+ "setOwner", src, null, resultingStat);
}
}
@@ -840,7 +897,7 @@ public class FSNamesystem implements FSC
/**
* Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long)
- * @throws FileNotFoundException
+ * @throws FileNotFoundException, UnresolvedLinkException, IOException
*/
LocatedBlocks getBlockLocations(String src, long offset, long length,
boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException,
@@ -857,7 +914,7 @@ public class FSNamesystem implements FSC
throw new HadoopIllegalArgumentException(
"Negative length is not supported. File: " + src);
}
- final LocatedBlocks ret = getBlockLocationsInternal(src,
+ final LocatedBlocks ret = getBlockLocationsUpdateTimes(src,
offset, length, doAccessTime, needBlockToken);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -867,7 +924,11 @@ public class FSNamesystem implements FSC
return ret;
}
- private LocatedBlocks getBlockLocationsInternal(String src,
+ /*
+ * Get block locations within the specified range, updating the
+ * access times if necessary.
+ */
+ private LocatedBlocks getBlockLocationsUpdateTimes(String src,
long offset,
long length,
boolean doAccessTime,
@@ -918,8 +979,7 @@ public class FSNamesystem implements FSC
LocatedBlocks getBlockLocationsInternal(INodeFile inode,
long offset, long length, boolean needBlockToken)
throws IOException {
- readLock();
- try {
+ assert hasReadOrWriteLock();
final BlockInfo[] blocks = inode.getBlocks();
if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@@ -951,9 +1011,6 @@ public class FSNamesystem implements FSC
return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
lastBlock, last.isComplete());
}
- } finally {
- readUnlock();
- }
}
/** Create a LocatedBlock. */
@@ -985,146 +1042,152 @@ public class FSNamesystem implements FSC
* @throws IOException
*/
public void concat(String target, String [] srcs)
- throws IOException, UnresolvedLinkException {
+ throws IOException, UnresolvedLinkException {
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
" to " + target);
}
- // check safe mode
- if (isInSafeMode()) {
- throw new SafeModeException("concat: cannot concat " + target, safeMode);
- }
// verify args
if(target.isEmpty()) {
- throw new IllegalArgumentException("concat: trg file name is empty");
+ throw new IllegalArgumentException("Target file name is empty");
}
if(srcs == null || srcs.length == 0) {
- throw new IllegalArgumentException("concat: srcs list is empty or null");
+ throw new IllegalArgumentException("No sources given");
}
- // currently we require all the files to be in the same dir
+ // We require all files be in the same directory
String trgParent =
target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
- for(String s : srcs) {
+ for (String s : srcs) {
String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
- if(! srcParent.equals(trgParent)) {
- throw new IllegalArgumentException
- ("concat: srcs and target shoould be in same dir");
+ if (!srcParent.equals(trgParent)) {
+ throw new IllegalArgumentException(
+ "Sources and target are not in the same directory");
}
}
-
+
+ HdfsFileStatus resultingStat = null;
writeLock();
try {
- // write permission for the target
- if (isPermissionEnabled) {
- checkPathAccess(target, FsAction.WRITE);
-
- // and srcs
- for(String aSrc: srcs) {
- checkPathAccess(aSrc, FsAction.READ); // read the file
- checkParentAccess(aSrc, FsAction.WRITE); // for delete
- }
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot concat " + target, safeMode);
}
+ concatInternal(target, srcs);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(target, false);
+ }
+ } finally {
+ writeUnlock();
+ }
+ getEditLog().logSync();
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ logAuditEvent(UserGroupInformation.getLoginUser(),
+ Server.getRemoteIp(),
+ "concat", Arrays.toString(srcs), target, resultingStat);
+ }
+ }
+ /** See {@link #concat(String, String[])} */
+ public void concatInternal(String target, String [] srcs)
+ throws IOException, UnresolvedLinkException {
+ assert hasWriteLock();
- // to make sure no two files are the same
- Set<INode> si = new HashSet<INode>();
-
- // we put the following prerequisite for the operation
- // replication and blocks sizes should be the same for ALL the blocks
- // check the target
- INode inode = dir.getFileINode(target);
+ // write permission for the target
+ if (isPermissionEnabled) {
+ checkPathAccess(target, FsAction.WRITE);
- if(inode == null) {
- throw new IllegalArgumentException("concat: trg file doesn't exist");
- }
- if(inode.isUnderConstruction()) {
- throw new IllegalArgumentException("concat: trg file is uner construction");
+ // and srcs
+ for(String aSrc: srcs) {
+ checkPathAccess(aSrc, FsAction.READ); // read the file
+ checkParentAccess(aSrc, FsAction.WRITE); // for delete
}
+ }
- INodeFile trgInode = (INodeFile) inode;
+ // to make sure no two files are the same
+ Set<INode> si = new HashSet<INode>();
- // per design trg shouldn't be empty and all the blocks same size
- if(trgInode.blocks.length == 0) {
- throw new IllegalArgumentException("concat: "+ target + " file is empty");
- }
+ // we put the following prerequisite for the operation
+ // replication and blocks sizes should be the same for ALL the blocks
+ // check the target
+ INode inode = dir.getFileINode(target);
- long blockSize = trgInode.getPreferredBlockSize();
+ if(inode == null) {
+ throw new IllegalArgumentException("concat: trg file doesn't exist");
+ }
+ if(inode.isUnderConstruction()) {
+ throw new IllegalArgumentException("concat: trg file is uner construction");
+ }
- // check the end block to be full
- if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
- throw new IllegalArgumentException(target + " blocks size should be the same");
- }
+ INodeFile trgInode = (INodeFile) inode;
- si.add(trgInode);
- short repl = trgInode.getReplication();
+ // per design trg shouldn't be empty and all the blocks same size
+ if(trgInode.blocks.length == 0) {
+ throw new IllegalArgumentException("concat: "+ target + " file is empty");
+ }
- // now check the srcs
- boolean endSrc = false; // final src file doesn't have to have full end block
- for(int i=0; i<srcs.length; i++) {
- String src = srcs[i];
- if(i==srcs.length-1)
- endSrc=true;
+ long blockSize = trgInode.getPreferredBlockSize();
- INodeFile srcInode = dir.getFileINode(src);
+ // check the end block to be full
+ if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
+ throw new IllegalArgumentException(target + " blocks size should be the same");
+ }
- if(src.isEmpty()
- || srcInode == null
- || srcInode.isUnderConstruction()
- || srcInode.blocks.length == 0) {
- throw new IllegalArgumentException("concat: file " + src +
- " is invalid or empty or underConstruction");
- }
+ si.add(trgInode);
+ short repl = trgInode.getReplication();
- // check replication and blocks size
- if(repl != srcInode.getReplication()) {
- throw new IllegalArgumentException(src + " and " + target + " " +
- "should have same replication: "
- + repl + " vs. " + srcInode.getReplication());
- }
+ // now check the srcs
+ boolean endSrc = false; // final src file doesn't have to have full end block
+ for(int i=0; i<srcs.length; i++) {
+ String src = srcs[i];
+ if(i==srcs.length-1)
+ endSrc=true;
- //boolean endBlock=false;
- // verify that all the blocks are of the same length as target
- // should be enough to check the end blocks
- int idx = srcInode.blocks.length-1;
- if(endSrc)
- idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
- if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
- throw new IllegalArgumentException("concat: blocks sizes of " +
- src + " and " + target + " should all be the same");
- }
+ INodeFile srcInode = dir.getFileINode(src);
- si.add(srcInode);
+ if(src.isEmpty()
+ || srcInode == null
+ || srcInode.isUnderConstruction()
+ || srcInode.blocks.length == 0) {
+ throw new IllegalArgumentException("concat: file " + src +
+ " is invalid or empty or underConstruction");
}
- // make sure no two files are the same
- if(si.size() < srcs.length+1) { // trg + srcs
- // it means at least two files are the same
- throw new IllegalArgumentException("at least two files are the same");
+ // check replication and blocks size
+ if(repl != srcInode.getReplication()) {
+ throw new IllegalArgumentException(src + " and " + target + " " +
+ "should have same replication: "
+ + repl + " vs. " + srcInode.getReplication());
}
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
- Arrays.toString(srcs) + " to " + target);
+ //boolean endBlock=false;
+ // verify that all the blocks are of the same length as target
+ // should be enough to check the end blocks
+ int idx = srcInode.blocks.length-1;
+ if(endSrc)
+ idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
+ if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
+ throw new IllegalArgumentException("concat: blocks sizes of " +
+ src + " and " + target + " should all be the same");
}
- dir.concatInternal(target,srcs);
- } finally {
- writeUnlock();
+ si.add(srcInode);
}
- getEditLog().logSync();
-
-
- if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(target, false);
- logAuditEvent(UserGroupInformation.getLoginUser(),
- Server.getRemoteIp(),
- "concat", Arrays.toString(srcs), target, stat);
+
+ // make sure no two files are the same
+ if(si.size() < srcs.length+1) { // trg + srcs
+ // it means at least two files are the same
+ throw new IllegalArgumentException("at least two files are the same");
}
-
- }
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
+ Arrays.toString(srcs) + " to " + target);
+ }
+
+ dir.concat(target,srcs);
+ }
+
/**
* stores the modification and access time for this inode.
* The access time is precise upto an hour. The transaction, if needed, is
@@ -1138,23 +1201,22 @@ public class FSNamesystem implements FSC
}
writeLock();
try {
- //
- // The caller needs to have write access to set access & modification times.
- if (isPermissionEnabled) {
- checkPathAccess(src, FsAction.WRITE);
- }
- INodeFile inode = dir.getFileINode(src);
- if (inode != null) {
- dir.setTimes(src, inode, mtime, atime, true);
- if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(src, false);
- logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
- "setTimes", src, null, stat);
+ // Write access is required to set access and modification times
+ if (isPermissionEnabled) {
+ checkPathAccess(src, FsAction.WRITE);
+ }
+ INodeFile inode = dir.getFileINode(src);
+ if (inode != null) {
+ dir.setTimes(src, inode, mtime, atime, true);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ final HdfsFileStatus stat = dir.getFileInfo(src, false);
+ logAuditEvent(UserGroupInformation.getCurrentUser(),
+ Server.getRemoteIp(),
+ "setTimes", src, null, stat);
+ }
+ } else {
+ throw new FileNotFoundException("File " + src + " does not exist.");
}
- } else {
- throw new FileNotFoundException("File " + src + " does not exist.");
- }
} finally {
writeUnlock();
}
@@ -1166,21 +1228,24 @@ public class FSNamesystem implements FSC
public void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
+ HdfsFileStatus resultingStat = null;
writeLock();
try {
- if (!createParent) {
- verifyParentDir(link);
- }
- createSymlinkInternal(target, link, dirPerms, createParent);
+ if (!createParent) {
+ verifyParentDir(link);
+ }
+ createSymlinkInternal(target, link, dirPerms, createParent);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(link, false);
+ }
} finally {
writeUnlock();
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(link, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
- "createSymlink", link, target, stat);
+ "createSymlink", link, target, resultingStat);
}
}
@@ -1190,13 +1255,11 @@ public class FSNamesystem implements FSC
private void createSymlinkInternal(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
- writeLock();
- try {
+ assert hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" +
target + " link=" + link);
}
-
if (isInSafeMode()) {
throw new SafeModeException("Cannot create symlink " + link, safeMode);
}
@@ -1215,9 +1278,6 @@ public class FSNamesystem implements FSC
// add symbolic link to namespace
dir.addSymlink(link, target, dirPerms, createParent);
- } finally {
- writeUnlock();
- }
}
/**
@@ -1235,7 +1295,16 @@ public class FSNamesystem implements FSC
*/
public boolean setReplication(String src, short replication)
throws IOException, UnresolvedLinkException {
- boolean status = setReplicationInternal(src, replication);
+ boolean status = false;
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot set replication for " + src, safeMode);
+ }
+ status = setReplicationInternal(src, replication);
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -1248,10 +1317,7 @@ public class FSNamesystem implements FSC
private boolean setReplicationInternal(String src,
short replication) throws AccessControlException, QuotaExceededException,
SafeModeException, UnresolvedLinkException, IOException {
- writeLock();
- try {
- if (isInSafeMode())
- throw new SafeModeException("Cannot set replication for " + src, safeMode);
+ assert hasWriteLock();
blockManager.verifyReplication(src, replication, null);
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.WRITE);
@@ -1281,17 +1347,19 @@ public class FSNamesystem implements FSC
+ ". New replication is " + replication);
}
return true;
- } finally {
- writeUnlock();
- }
}
long getPreferredBlockSize(String filename)
- throws IOException, UnresolvedLinkException {
- if (isPermissionEnabled) {
- checkTraverse(filename);
+ throws IOException, UnresolvedLinkException {
+ readLock();
+ try {
+ if (isPermissionEnabled) {
+ checkTraverse(filename);
+ }
+ return dir.getPreferredBlockSize(filename);
+ } finally {
+ readUnlock();
}
- return dir.getPreferredBlockSize(filename);
}
/*
@@ -1299,6 +1367,7 @@ public class FSNamesystem implements FSC
*/
private void verifyParentDir(String src) throws FileNotFoundException,
ParentNotDirectoryException, UnresolvedLinkException {
+ assert hasReadOrWriteLock();
Path parent = new Path(src).getParent();
if (parent != null) {
INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
@@ -1324,8 +1393,13 @@ public class FSNamesystem implements FSC
short replication, long blockSize) throws AccessControlException,
SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
- startFileInternal(src, permissions, holder, clientMachine, flag,
- createParent, replication, blockSize);
+ writeLock();
+ try {
+ startFileInternal(src, permissions, holder, clientMachine, flag,
+ createParent, replication, blockSize);
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
@@ -1357,6 +1431,7 @@ public class FSNamesystem implements FSC
long blockSize) throws SafeModeException, FileAlreadyExistsException,
AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
+ assert hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+ ", holder=" + holder
@@ -1365,10 +1440,9 @@ public class FSNamesystem implements FSC
+ ", replication=" + replication
+ ", createFlag=" + flag.toString());
}
- writeLock();
- try {
- if (isInSafeMode())
+ if (isInSafeMode()) {
throw new SafeModeException("Cannot create file" + src, safeMode);
+ }
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
@@ -1476,9 +1550,6 @@ public class FSNamesystem implements FSC
+ie.getMessage());
throw ie;
}
- } finally {
- writeUnlock();
- }
return null;
}
@@ -1493,35 +1564,41 @@ public class FSNamesystem implements FSC
* @return true if the file is already closed
* @throws IOException
*/
- synchronized boolean recoverLease(String src, String holder, String clientMachine)
- throws IOException {
- if (isInSafeMode()) {
- throw new SafeModeException(
- "Cannot recover the lease of " + src, safeMode);
- }
- if (!DFSUtil.isValidName(src)) {
- throw new IOException("Invalid file name: " + src);
- }
-
- INode inode = dir.getFileINode(src);
- if (inode == null) {
- throw new FileNotFoundException("File not found " + src);
- }
-
- if (!inode.isUnderConstruction()) {
- return true;
- }
- if (isPermissionEnabled) {
- checkPathAccess(src, FsAction.WRITE);
+ boolean recoverLease(String src, String holder, String clientMachine)
+ throws IOException {
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot recover the lease of " + src, safeMode);
+ }
+ if (!DFSUtil.isValidName(src)) {
+ throw new IOException("Invalid file name: " + src);
+ }
+
+ INode inode = dir.getFileINode(src);
+ if (inode == null) {
+ throw new FileNotFoundException("File not found " + src);
+ }
+
+ if (!inode.isUnderConstruction()) {
+ return true;
+ }
+ if (isPermissionEnabled) {
+ checkPathAccess(src, FsAction.WRITE);
+ }
+
+ recoverLeaseInternal(inode, src, holder, clientMachine, true);
+ } finally {
+ writeUnlock();
}
-
- recoverLeaseInternal(inode, src, holder, clientMachine, true);
return false;
}
private void recoverLeaseInternal(INode fileInode,
String src, String holder, String clientMachine, boolean force)
- throws IOException {
+ throws IOException {
+ assert hasWriteLock();
if (fileInode != null && fileInode.isUnderConstruction()) {
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
//
@@ -1607,12 +1684,16 @@ public class FSNamesystem implements FSC
throw new UnsupportedOperationException("Append to hdfs not supported." +
" Please refer to dfs.support.append configuration parameter.");
}
- LocatedBlock lb =
- startFileInternal(src, null, holder, clientMachine,
+ LocatedBlock lb = null;
+ writeLock();
+ try {
+ lb = startFileInternal(src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND),
false, (short)blockManager.maxReplication, (long)0);
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
-
if (lb != null) {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
@@ -1621,7 +1702,6 @@ public class FSNamesystem implements FSC
+" block size " + lb.getBlock().getNumBytes());
}
}
-
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
@@ -1713,6 +1793,9 @@ public class FSNamesystem implements FSC
// Allocate a new block and record it in the INode.
writeLock();
try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot add block to " + src, safeMode);
+ }
INode[] pathINodes = dir.getExistingPathINodes(src);
int inodesLen = pathINodes.length;
checkLease(src, clientName, pathINodes[inodesLen-1]);
@@ -1732,7 +1815,7 @@ public class FSNamesystem implements FSC
} finally {
writeUnlock();
}
-
+
// Create next block
LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
if (isBlockTokenEnabled) {
@@ -1752,6 +1835,7 @@ public class FSNamesystem implements FSC
final DatanodeDescriptor clientnode;
final long preferredblocksize;
+ final List<DatanodeDescriptor> chosen;
readLock();
try {
//check safe mode
@@ -1764,17 +1848,17 @@ public class FSNamesystem implements FSC
final INodeFileUnderConstruction file = checkLease(src, clientName);
clientnode = file.getClientNode();
preferredblocksize = file.getPreferredBlockSize();
- } finally {
- readUnlock();
- }
- //find datanode descriptors
- final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
- for(DatanodeInfo d : existings) {
- final DatanodeDescriptor descriptor = getDatanode(d);
- if (descriptor != null) {
- chosen.add(descriptor);
+ //find datanode descriptors
+ chosen = new ArrayList<DatanodeDescriptor>();
+ for(DatanodeInfo d : existings) {
+ final DatanodeDescriptor descriptor = getDatanode(d);
+ if (descriptor != null) {
+ chosen.add(descriptor);
+ }
}
+ } finally {
+ readUnlock();
}
// choose new datanodes.
@@ -1793,25 +1877,28 @@ public class FSNamesystem implements FSC
* The client would like to let go of the given block
*/
public boolean abandonBlock(ExtendedBlock b, String src, String holder)
- throws LeaseExpiredException, FileNotFoundException,
- UnresolvedLinkException, IOException {
- writeLock();
- try {
- //
- // Remove the block from the pending creates list
- //
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- +b+"of file "+src);
- }
- INodeFileUnderConstruction file = checkLease(src, holder);
- dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- + b
- + " is removed from pendingCreates");
- }
- return true;
+ throws LeaseExpiredException, FileNotFoundException,
+ UnresolvedLinkException, IOException {
+ writeLock();
+ try {
+ //
+ // Remove the block from the pending creates list
+ //
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ +b+"of file "+src);
+ }
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot abandon block " + b +
+ " for fle" + src, safeMode);
+ }
+ INodeFileUnderConstruction file = checkLease(src, holder);
+ dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ + b + " is removed from pendingCreates");
+ }
+ return true;
} finally {
writeUnlock();
}
@@ -1819,7 +1906,8 @@ public class FSNamesystem implements FSC
// make sure that we still have the lease on this file.
private INodeFileUnderConstruction checkLease(String src, String holder)
- throws LeaseExpiredException, UnresolvedLinkException {
+ throws LeaseExpiredException, UnresolvedLinkException {
+ assert hasReadOrWriteLock();
INodeFile file = dir.getFileINode(src);
checkLease(src, holder, file);
return (INodeFileUnderConstruction)file;
@@ -1827,7 +1915,7 @@ public class FSNamesystem implements FSC
private void checkLease(String src, String holder, INode file)
throws LeaseExpiredException {
-
+ assert hasReadOrWriteLock();
if (file == null || file.isDirectory()) {
Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException("No lease on " + src +
@@ -1860,23 +1948,29 @@ public class FSNamesystem implements FSC
public boolean completeFile(String src, String holder, ExtendedBlock last)
throws SafeModeException, UnresolvedLinkException, IOException {
checkBlock(last);
- boolean success = completeFileInternal(src, holder,
+ boolean success = false;
+ writeLock();
+ try {
+ success = completeFileInternal(src, holder,
ExtendedBlock.getLocalBlock(last));
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
- return success ;
+ return success;
}
private boolean completeFileInternal(String src,
String holder, Block last) throws SafeModeException,
UnresolvedLinkException, IOException {
- writeLock();
- try {
- if(NameNode.stateChangeLog.isDebugEnabled()) {
+ assert hasWriteLock();
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
src + " for " + holder);
}
- if (isInSafeMode())
+ if (isInSafeMode()) {
throw new SafeModeException("Cannot complete file " + src, safeMode);
+ }
INodeFileUnderConstruction pendingFile = checkLease(src, holder);
// commit the last block and complete it if it has minimum replicas
@@ -1891,9 +1985,6 @@ public class FSNamesystem implements FSC
NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
+ " is closed by " + holder);
return true;
- } finally {
- writeUnlock();
- }
}
/**
@@ -1922,6 +2013,7 @@ public class FSNamesystem implements FSC
*/
private Block allocateBlock(String src, INode[] inodes,
DatanodeDescriptor targets[]) throws QuotaExceededException {
+ assert hasWriteLock();
Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);
while(isValidBlock(b)) {
b.setBlockId(FSNamesystem.randBlockId.nextLong());
@@ -1939,35 +2031,35 @@ public class FSNamesystem implements FSC
* all blocks, otherwise check only penultimate block.
*/
boolean checkFileProgress(INodeFile v, boolean checkall) {
- writeLock();
+ readLock();
try {
- if (checkall) {
- //
- // check all blocks of the file.
- //
- for (BlockInfo block: v.getBlocks()) {
- if (!block.isComplete()) {
+ if (checkall) {
+ //
+ // check all blocks of the file.
+ //
+ for (BlockInfo block: v.getBlocks()) {
+ if (!block.isComplete()) {
+ LOG.info("BLOCK* NameSystem.checkFileProgress: "
+ + "block " + block + " has not reached minimal replication "
+ + blockManager.minReplication);
+ return false;
+ }
+ }
+ } else {
+ //
+ // check the penultimate block of this file
+ //
+ BlockInfo b = v.getPenultimateBlock();
+ if (b != null && !b.isComplete()) {
LOG.info("BLOCK* NameSystem.checkFileProgress: "
- + "block " + block + " has not reached minimal replication "
+ + "block " + b + " has not reached minimal replication "
+ blockManager.minReplication);
return false;
}
}
- } else {
- //
- // check the penultimate block of this file
- //
- BlockInfo b = v.getPenultimateBlock();
- if (b != null && !b.isComplete()) {
- LOG.info("BLOCK* NameSystem.checkFileProgress: "
- + "block " + b + " has not reached minimal replication "
- + blockManager.minReplication);
- return false;
- }
- }
- return true;
+ return true;
} finally {
- writeUnlock();
+ readUnlock();
}
}
@@ -2006,13 +2098,26 @@ public class FSNamesystem implements FSC
@Deprecated
boolean renameTo(String src, String dst)
throws IOException, UnresolvedLinkException {
- boolean status = renameToInternal(src, dst);
+ boolean status = false;
+ HdfsFileStatus resultingStat = null;
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
+ " to " + dst);
+ }
+ writeLock();
+ try {
+ status = renameToInternal(src, dst);
+ if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(dst, false);
+ }
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
- final HdfsFileStatus stat = dir.getFileInfo(dst, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
- "rename", src, dst, stat);
+ "rename", src, dst, resultingStat);
}
return status;
}
@@ -2021,19 +2126,13 @@ public class FSNamesystem implements FSC
@Deprecated
private boolean renameToInternal(String src, String dst)
throws IOException, UnresolvedLinkException {
-
- writeLock();
- try {
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
- " to " + dst);
- }
- if (isInSafeMode())
+ assert hasWriteLock();
+ if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
+ }
if (!DFSUtil.isValidName(dst)) {
throw new IOException("Invalid name: " + dst);
}
-
if (isPermissionEnabled) {
//We should not be doing this. This is move() not renameTo().
//but for now,
@@ -2045,40 +2144,44 @@ public class FSNamesystem implements FSC
HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
if (dir.renameTo(src, dst)) {
- changeLease(src, dst, dinfo); // update lease with new filename
+ unprotectedChangeLease(src, dst, dinfo); // update lease with new filename
return true;
}
return false;
- } finally {
- writeUnlock();
- }
}
/** Rename src to dst */
void renameTo(String src, String dst, Options.Rename... options)
throws IOException, UnresolvedLinkException {
- renameToInternal(src, dst, options);
+ HdfsFileStatus resultingStat = null;
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ + src + " to " + dst);
+ }
+ writeLock();
+ try {
+ renameToInternal(src, dst, options);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ resultingStat = dir.getFileInfo(dst, false);
+ }
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
StringBuilder cmd = new StringBuilder("rename options=");
for (Rename option : options) {
cmd.append(option.value()).append(" ");
}
- final HdfsFileStatus stat = dir.getFileInfo(dst, false);
logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(),
- cmd.toString(), src, dst, stat);
+ cmd.toString(), src, dst, resultingStat);
}
}
private void renameToInternal(String src, String dst,
Options.Rename... options) throws IOException {
- writeLock();
- try {
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
- + src + " to " + dst);
- }
+ assert hasWriteLock();
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
@@ -2092,10 +2195,7 @@ public class FSNamesystem implements FSC
HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
dir.renameTo(src, dst, options);
- changeLease(src, dst, dinfo); // update lease with new filename
- } finally {
- writeUnlock();
- }
+ unprotectedChangeLease(src, dst, dinfo); // update lease with new filename
}
/**
@@ -2105,15 +2205,12 @@ public class FSNamesystem implements FSC
* description of exceptions
*/
public boolean delete(String src, boolean recursive)
- throws AccessControlException, SafeModeException,
- UnresolvedLinkException, IOException {
- if ((!recursive) && (!dir.isDirEmpty(src))) {
- throw new IOException(src + " is non empty");
- }
+ throws AccessControlException, SafeModeException,
+ UnresolvedLinkException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
}
- boolean status = deleteInternal(src, true);
+ boolean status = deleteInternal(src, recursive, true);
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
@@ -2133,9 +2230,10 @@ public class FSNamesystem implements FSC
*
* @see ClientProtocol#delete(String, boolean) for description of exceptions
*/
- private boolean deleteInternal(String src, boolean enforcePermission)
- throws AccessControlException, SafeModeException,
- UnresolvedLinkException, IOException{
+ private boolean deleteInternal(String src, boolean recursive,
+ boolean enforcePermission)
+ throws AccessControlException, SafeModeException, UnresolvedLinkException,
+ IOException {
boolean deleteNow = false;
ArrayList<Block> collectedBlocks = new ArrayList<Block>();
@@ -2144,6 +2242,9 @@ public class FSNamesystem implements FSC
if (isInSafeMode()) {
throw new SafeModeException("Cannot delete " + src, safeMode);
}
+ if (!recursive && !dir.isDirEmpty(src)) {
+ throw new IOException(src + " is non empty");
+ }
if (enforcePermission && isPermissionEnabled) {
checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
}
@@ -2158,10 +2259,16 @@ public class FSNamesystem implements FSC
} finally {
writeUnlock();
}
- // Log directory deletion to editlog
+
getEditLog().logSync();
- if (!deleteNow) {
- removeBlocks(collectedBlocks); // Incremental deletion of blocks
+
+ writeLock();
+ try {
+ if (!deleteNow) {
+ removeBlocks(collectedBlocks); // Incremental deletion of blocks
+ }
+ } finally {
+ writeUnlock();
}
collectedBlocks.clear();
if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2173,24 +2280,21 @@ public class FSNamesystem implements FSC
/** From the given list, incrementally remove the blocks from blockManager */
private void removeBlocks(List<Block> blocks) {
+ assert hasWriteLock();
int start = 0;
int end = 0;
while (start < blocks.size()) {
end = BLOCK_DELETION_INCREMENT + start;
end = end > blocks.size() ? blocks.size() : end;
- writeLock();
- try {
- for (int i=start; i<end; i++) {
- blockManager.removeBlock(blocks.get(i));
- }
- } finally {
- writeUnlock();
+ for (int i=start; i<end; i++) {
+ blockManager.removeBlock(blocks.get(i));
}
start = end;
}
}
void removePathAndBlocks(String src, List<Block> blocks) {
+ assert hasWriteLock();
leaseManager.removeLeaseWithPrefixPath(src);
if (blocks == null) {
return;
@@ -2213,13 +2317,18 @@ public class FSNamesystem implements FSC
*/
HdfsFileStatus getFileInfo(String src, boolean resolveLink)
throws AccessControlException, UnresolvedLinkException {
- if (!DFSUtil.isValidName(src)) {
- throw new InvalidPathException("Invalid file name: " + src);
- }
- if (isPermissionEnabled) {
- checkTraverse(src);
+ readLock();
+ try {
+ if (!DFSUtil.isValidName(src)) {
+ throw new InvalidPathException("Invalid file name: " + src);
+ }
+ if (isPermissionEnabled) {
+ checkTraverse(src);
+ }
+ return dir.getFileInfo(src, resolveLink);
+ } finally {
+ readUnlock();
}
- return dir.getFileInfo(src, resolveLink);
}
/**
@@ -2227,7 +2336,16 @@ public class FSNamesystem implements FSC
*/
public boolean mkdirs(String src, PermissionStatus permissions,
boolean createParent) throws IOException, UnresolvedLinkException {
- boolean status = mkdirsInternal(src, permissions, createParent);
+ boolean status = false;
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+ }
+ writeLock();
+ try {
+ status = mkdirsInternal(src, permissions, createParent);
+ } finally {
+ writeUnlock();
+ }
getEditLog().logSync();
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
@@ -2244,10 +2362,9 @@ public class FSNamesystem implements FSC
private boolean mkdirsInternal(String src,
PermissionStatus permissions, boolean createParent)
throws IOException, UnresolvedLinkException {
- writeLock();
- try {
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+ assert hasWriteLock();
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot create directory " + src, safeMode);
}
if (isPermissionEnabled) {
checkTraverse(src);
@@ -2257,15 +2374,12 @@ public class FSNamesystem implements FSC
// a new directory is not created.
return true;
}
- if (isInSafeMode())
- throw new SafeModeException("Cannot create directory " + src, safeMode);
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
if (isPermissionEnabled) {
checkAncestorAccess(src, FsAction.WRITE);
}
-
if (!createParent) {
verifyParentDir(src);
}
@@ -2279,17 +2393,19 @@ public class FSNamesystem implements FSC
throw new IOException("Failed to create directory: " + src);
}
return true;
- } finally {
- writeUnlock();
- }
}
ContentSummary getContentSummary(String src) throws AccessControlException,
FileNotFoundException, UnresolvedLinkException {
- if (isPermissionEnabled) {
- checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+ readLock();
+ try {
+ if (isPermissionEnabled) {
+ checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+ }
+ return dir.getContentSummary(src);
+ } finally {
+ readUnlock();
}
- return dir.getContentSummary(src);
}
/**
@@ -2299,12 +2415,18 @@ public class FSNamesystem implements FSC
*/
void setQuota(String path, long nsQuota, long dsQuota)
throws IOException, UnresolvedLinkException {
- if (isInSafeMode())
- throw new SafeModeException("Cannot set quota on " + path, safeMode);
- if (isPermissionEnabled) {
- checkSuperuserPrivilege();
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot set quota on " + path, safeMode);
+ }
+ if (isPermissionEnabled) {
+ checkSuperuserPrivilege();
+ }
+ dir.setQuota(path, nsQuota, dsQuota);
+ } finally {
+ writeUnlock();
}
- dir.setQuota(path, nsQuota, dsQuota);
getEditLog().logSync();
}
@@ -2315,7 +2437,6 @@ public class FSNamesystem implements FSC
*/
void fsync(String src, String clientName)
throws IOException, UnresolvedLinkException {
-
NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
+ src + " for " + clientName);
writeLock();
@@ -2328,6 +2449,7 @@ public class FSNamesystem implements FSC
} finally {
writeUnlock();
}
+ getEditLog().logSync();
}
/**
@@ -2347,9 +2469,8 @@ public class FSNamesystem implements FSC
String recoveryLeaseHolder) throws AlreadyBeingCreatedException,
IOException, UnresolvedLinkException {
LOG.info("Recovering lease=" + lease + ", src=" + src);
-
assert !isInSafeMode();
-
+ assert hasWriteLock();
INodeFile iFile = dir.getFileINode(src);
if (iFile == null) {
final String message = "DIR* NameSystem.internalReleaseLease: "
@@ -2471,6 +2592,7 @@ public class FSNamesystem implements FSC
Lease reassignLease(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) throws IOException {
+ assert hasWriteLock();
if(newHolder == null)
return lease;
logReassignLease(lease.getHolder(), src, newHolder);
@@ -2479,6 +2601,7 @@ public class FSNamesystem implements FSC
Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) throws IOException {
+ assert hasWriteLock();
pendingFile.setClientName(newHolder);
return leaseManager.reassignLease(lease, src, newHolder);
}
@@ -2487,7 +2610,7 @@ public class FSNamesystem implements FSC
private void finalizeINodeFileUnderConstruction(String src,
INodeFileUnderConstruction pendingFile)
throws IOException, UnresolvedLinkException {
-
+ assert hasWriteLock();
leaseManager.removeLease(pendingFile.getClientName(), src);
// The file is no longer pending.
@@ -2508,92 +2631,96 @@ public class FSNamesystem implements FSC
String src = "";
writeLock();
try {
- LOG.info("commitBlockSynchronization(lastblock=" + lastblock
- + ", newgenerationstamp=" + newgenerationstamp
- + ", newlength=" + newlength
- + ", newtargets=" + Arrays.asList(newtargets)
- + ", closeFile=" + closeFile
- + ", deleteBlock=" + deleteblock
- + ")");
- final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot commitBlockSynchronization while in safe mode",
+ safeMode);
+ }
+ LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+ + ", newgenerationstamp=" + newgenerationstamp
+ + ", newlength=" + newlength
+ + ", newtargets=" + Arrays.asList(newtargets)
+ + ", closeFile=" + closeFile
+ + ", deleteBlock=" + deleteblock
+ + ")");
+ final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
.getLocalBlock(lastblock));
- if (storedBlock == null) {
- throw new IOException("Block (=" + lastblock + ") not found");
- }
- INodeFile iFile = storedBlock.getINode();
- if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
- throw new IOException("Unexpected block (=" + lastblock
- + ") since the file (=" + iFile.getLocalName()
- + ") is not under construction");
- }
-
- long recoveryId =
- ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
- if(recoveryId != newgenerationstamp) {
- throw new IOException("The recovery id " + newgenerationstamp
- + " does not match current recovery id "
- + recoveryId + " for block " + lastblock);
- }
-
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+ if (storedBlock == null) {
+ throw new IOException("Block (=" + lastblock + ") not found");
+ }
+ INodeFile iFile = storedBlock.getINode();
+ if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
+ throw new IOException("Unexpected block (=" + lastblock
+ + ") since the file (=" + iFile.getLocalName()
+ + ") is not under construction");
+ }
+
+ long recoveryId =
+ ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
+ if(recoveryId != newgenerationstamp) {
+ throw new IOException("The recovery id " + newgenerationstamp
+ + " does not match current recovery id "
+ + recoveryId + " for block " + lastblock);
+ }
+
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
- if (deleteblock) {
- pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
- blockManager.removeBlockFromMap(storedBlock);
- }
- else {
- // update last block
- storedBlock.setGenerationStamp(newgenerationstamp);
- storedBlock.setNumBytes(newlength);
-
- // find the DatanodeDescriptor objects
- // There should be no locations in the blockManager till now because the
- // file is underConstruction
- DatanodeDescriptor[] descriptors = null;
- if (newtargets.length > 0) {
- descriptors = new DatanodeDescriptor[newtargets.length];
- for(int i = 0; i < newtargets.length; i++) {
- descriptors[i] = getDatanode(newtargets[i]);
+ if (deleteblock) {
+ pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
+ blockManager.removeBlockFromMap(storedBlock);
+ }
+ else {
+ // update last block
+ storedBlock.setGenerationStamp(newgenerationstamp);
+ storedBlock.setNumBytes(newlength);
+
+ // find the DatanodeDescriptor objects
+ // There should be no locations in the blockManager till now because the
+ // file is underConstruction
+ DatanodeDescriptor[] descriptors = null;
+ if (newtargets.length > 0) {
+ descriptors = new DatanodeDescriptor[newtargets.length];
+ for(int i = 0; i < newtargets.length; i++) {
+ descriptors[i] = getDatanode(newtargets[i]);
+ }
+ }
+ if (closeFile) {
+ // the file is getting closed. Insert block locations into blockManager.
+ // Otherwise fsck will report these blocks as MISSING, especially if the
+ // blocksReceived from Datanodes take a long time to arrive.
+ for (int i = 0; i < descriptors.length; i++) {
+ descriptors[i].addBlock(storedBlock);
+ }
}
+ // add pipeline locations into the INodeUnderConstruction
+ pendingFile.setLastBlock(storedBlock, descriptors);
}
+
+ src = leaseManager.findPath(pendingFile);
if (closeFile) {
- // the file is getting closed. Insert block locations into blockManager.
- // Otherwise fsck will report these blocks as MISSING, especially if the
- // blocksReceived from Datanodes take a long time to arrive.
- for (int i = 0; i < descriptors.length; i++) {
- descriptors[i].addBlock(storedBlock);
- }
- }
- // add pipeline locations into the INodeUnderConstruction
- pendingFile.setLastBlock(storedBlock, descriptors);
- }
-
- // If this commit does not want to close the file, persist
- // blocks only if append is supported and return
- src = leaseManager.findPath(pendingFile);
- if (!closeFile) {
- if (supportAppends) {
+ // commit the last block and complete it if it has minimum replicas
+ blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
+
+ //remove lease, close file
+ finalizeINodeFileUnderConstruction(src, pendingFile);
+ } else if (supportAppends) {
+ // If this commit does not want to close the file, persist
+ // blocks only if append is supported
dir.persistBlocks(src, pendingFile);
- getEditLog().logSync();
}
- LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
- return;
- }
-
- // commit the last block and complete it if it has minimum replicas
- blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
-
- //remove lease, close file
- finalizeINodeFileUnderConstruction(src, pendingFile);
} finally {
writeUnlock();
}
getEditLog().logSync();
- LOG.info("commitBlockSynchronization(newblock=" + lastblock
+ if (closeFile) {
+ LOG.info("commitBlockSynchronization(newblock=" + lastblock
+ ", file=" + src
+ ", newgenerationstamp=" + newgenerationstamp
+ ", newlength=" + newlength
+ ", newtargets=" + Arrays.asList(newtargets) + ") successful");
+ } else {
+ LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
+ }
}
@@ -2601,9 +2728,15 @@ public class FSNamesystem implements FSC
* Renew the lease(s) held by the given client
*/
void renewLease(String holder) throws IOException {
- if (isInSafeMode())
- throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
- leaseManager.renewLease(holder);
+ writeLock();
+ try {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
+ }
+ leaseManager.renewLease(holder);
+ } finally {
+ writeUnlock();
+ }
}
/**
@@ -2621,20 +2754,26 @@ public class FSNamesystem implements FSC
public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation)
throws AccessControlException, UnresolvedLinkException, IOException {
- if (isPermissionEnabled) {
- if (dir.isDir(src)) {
- checkPathAccess(src, FsAction.READ_EXECUTE);
+ DirectoryListing dl;
+ readLock();
+ try {
+ if (isPermissionEnabled) {
+ if (dir.isDir(src)) {
+ checkPathAccess(src, FsAction.READ_EXECUTE);
+ } else {
+ checkTraverse(src);
+ }
}
- else {
- checkTraverse(src);
+ if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+ logAuditEvent(UserGroupInformation.getCurrentUser(),
+ Server.getRemoteIp(),
+ "listStatus", src, null, null);
}
+ dl = dir.getListing(src, startAfter, needLocation);
+ } finally {
+ readUnlock();
}
- if (auditLog.isInfoEnabled() && isExternalInvocation()) {
- logAuditEvent(UserGroupInformation.getCurrentUser(),
- Server.getRemoteIp(),
- "listStatus", src, null, null);
- }
- return dir.getListing(src, startAfter, needLocation);
+ return dl;
}
/////////////////////////////////////////////////////////
@@ -2665,10 +2804,20 @@ public class FSNamesystem implements FSC
*
* @see org.apache.hadoop.hdfs.server.datanode.DataNode
*/
- public void registerDatanode(DatanodeRegistration nodeReg
- ) throws IOException {
+ public void registerDatanode(DatanodeRegistration nodeReg)
+ throws IOException {
writeLock();
try {
+ registerDatanodeInternal(nodeReg);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ /** @see #registerDatanode(DatanodeRegistration) */
+ public void registerDatanodeInternal(DatanodeRegistration nodeReg)
+ throws IOException {
+ assert hasWriteLock();
String dnAddress = Server.getRemoteAddress();
if (dnAddress == null) {
// Mostly called inside an RPC.
@@ -2785,17 +2934,12 @@ public class FSNamesystem implements FSC
// because its is done when the descriptor is created
}
- if (safeMode != null) {
- safeMode.checkMode();
- }
- return;
- } finally {
- writeUnlock();
- }
+ checkSafeMode();
}
/* Resolve a node's network location */
private void resolveNetworkLocation (DatanodeDescriptor node) {
+ assert hasWriteLock();
List<String> names = new ArrayList<String>(1);
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
// get the node's IP address
@@ -2872,7 +3016,23 @@ public class FSNamesystem implements FSC
DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes)
- throws IOException {
+ throws IOException {
+ readLock();
+ try {
+ return handleHeartbeatInternal(nodeReg, capacity, dfsUsed,
+ remaining, blockPoolUsed, xceiverCount, xmitsInProgress,
+ failedVolumes);
+ } finally {
+ readUnlock();
+ }
+ }
+
+ /** @see #handleHeartbeat(DatanodeRegistration, long, long, long, long, int, int, int) */
+ DatanodeCommand[] handleHeartbeatInternal(DatanodeRegistration nodeReg,
+ long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+ int xceiverCount, int xmitsInProgress, int failedVolumes)
+ throws IOException {
+ assert hasReadLock();
DatanodeCommand cmd = null;
synchronized (heartbeats) {
synchronized (datanodeMap) {
@@ -3113,10 +3273,14 @@ public class FSNamesystem implements FSC
int workFound = 0;
int blocksToProcess = 0;
int nodesToProcess = 0;
- // blocks should not be replicated or removed if safe mode is on
+ // Blocks should not be replicated or removed if in safe mode.
+ // It's OK to check safe mode here w/o holding lock, in the worst
+ // case extra replications will be scheduled, and these will get
+ // fixed up later.
if (isInSafeMode())
return workFound;
- synchronized(heartbeats) {
+
+ synchronized (heartbeats) {
blocksToProcess = (int)(heartbeats.size()
* ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
nodesToProcess = (int)Math.ceil((double)heartbeats.size()
@@ -3150,13 +3314,13 @@ public class FSNamesystem implements FSC
throws IOException {
writeLock();
try {
- DatanodeDescriptor nodeInfo = getDatanode(nodeID);
- if (nodeInfo != null) {
- removeDatanode(nodeInfo);
- } else {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
- + nodeID.getName() + " does not exist");
- }
+ DatanodeDescriptor nodeInfo = getDatanode(nodeID);
+ if (nodeInfo != null) {
+ removeDatanode(nodeInfo);
+ } else {
+ NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
+ + nodeID.getName() + " does not exist");
+ }
} finally {
writeUnlock();
}
@@ -3167,6 +3331,7 @@ public class FSNamesystem implements FSC
* @param nodeInfo datanode descriptor.
*/
private void removeDatanode(DatanodeDescriptor nodeInfo) {
+ assert hasWriteLock();
synchronized (heartbeats) {
if (nodeInfo.isAlive) {
updateStats(nodeInfo, false);
@@ -3180,14 +3345,13 @@ public class FSNamesystem implements FSC
blockManager.removeStoredBlock(it.next(), nodeInfo);
}
unprotectedRemoveDatanode(nodeInfo);
- clusterMap.remove(nodeInfo);
-
- if (safeMode != null) {
- safeMode.checkMode();
- }
+ clusterMap.remove(nodeInfo);
+
+ checkSafeMode();
}
void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
+ assert hasWriteLock();
nodeDescr.resetBlocks();
blockManager.removeFromInvalidates(nodeDescr.getStorageID());
if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3198,12 +3362,14 @@ public class FSNamesystem implements FSC
}
void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
- /* To keep host2DataNodeMap consistent with datanodeMap,
- remove from host2DataNodeMap the datanodeDescriptor removed
- from datanodeMap before adding nodeDescr to host2DataNodeMap.
- */
- host2DataNodeMap.remove(
+ assert hasWriteLock();
+ // To keep host2DataNodeMap consistent with datanodeMap,
+ // remove from host2DataNodeMap the datanodeDescriptor removed
+ // from datanodeMap before adding nodeDescr to host2DataNodeMap.
+ synchronized (datanodeMap) {
+ host2DataNodeMap.remove(
datanodeMap.put(nodeDescr.getStorageID(), nodeDescr));
+ }
host2DataNodeMap.add(nodeDescr);
if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3220,8 +3386,11 @@ public class FSNamesystem implements FSC
* @throws IOException
*/
void wipeDatanode(DatanodeID nodeID) throws IOException {
+ assert hasWriteLock();
String key = nodeID.getStorageID();
- host2DataNodeMap.remove(datanodeMap.remove(key));
+ synchronized (datanodeMap) {
+ host2DataNodeMap.remove(datanodeMap.remove(key));
+ }
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.wipeDatanode: "
@@ -3246,8 +3415,9 @@ public class FSNamesystem implements FSC
* effect causes more datanodes to be declared dead.
*/
void heartbeatCheck() {
+ // It's OK to check safe mode w/o taking the lock here, we re-check
+ // for safe mode after taking the lock before removing a datanode.
if (isInSafeMode()) {
- // not to check dead nodes if in safemode
return;
}
boolean allAlive = false;
@@ -3272,6 +3442,9 @@ public class FSNamesystem implements FSC
// acquire the fsnamesystem lock, and then remove the dead node.
if (foundDead) {
writeLock();
+ if (isInSafeMode()) {
+ return;
+ }
try {
synchronized(heartbeats) {
synchronized (datanodeMap) {
@@ -3307,21 +3480,21 @@ public class FSNamesystem implements FSC
writeLock();
startTime = now(); //after acquiring write lock
try {
- DatanodeDescriptor node = getDatanode(nodeID);
- if (node == null || !node.isAlive) {
- throw new IOException("ProcessReport from dead or unregistered node: "
- + nodeID.getName());
- }
- // To minimize startup time, we discard any second (or later) block reports
- // that we receive while still in startup phase.
- if (isInStartupSafeMode() && node.numBlocks() > 0) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
- + "discarded non-initial block report from " + nodeID.getName()
- + " because namenode still in startup phase");
- return;
- }
-
- blockManager.processReport(node, newReport);
+ DatanodeDescriptor node = getDatanode(nodeID);
+ if (node == null || !node.isAlive) {
+ throw new IOException("ProcessReport from dead or unregistered node: "
+ + nodeID.getName());
+ }
+ // To minimize startup time, we discard any second (or later) block reports
+ // that we receive while still in startup phase.
+ if (isInStartupSafeMode() && node.numBlocks() > 0) {
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+ + "discarded non-initial block report from " + nodeID.getName()
+ + " because namenode still in startup phase");
+ return;
+ }
+
+ blockManager.processReport(node, newReport);
} finally {
endTime = now();
writeUnlock();
@@ -3353,6 +3526,7 @@ public class FSNamesystem implements FSC
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
BlockPlacementPolicy replicator) {
+ assert hasWriteLock();
// first form a rack to datanodes map and
INodeFile inode = blockManager.getINode(b);
HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
@@ -3446,20 +3620,20 @@ public class FSNamesystem implements FSC
) throws IOException {
writeLock();
try {
- DatanodeDescriptor node = getDatanode(nodeID);
- if (node == null || !node.isAlive) {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
- + " is received from dead or unregistered node " + nodeID.getName());
- throw new IOException(
- "Got blockReceived message from unregistered or dead node " + block);
- }
-
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
- +block+" is received from " + nodeID.getName());
- }
-
- blockManager.addBlock(node, block, delHint);
+ DatanodeDescriptor node = getDatanode(nodeID);
+ if (node == null || !node.isAlive) {
+ NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
+ + " is received from dead or unregistered node " + nodeID.getName());
+ throw new IOException(
+ "Got blockReceived message from unregistered or dead node " + block);
+ }
+
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
+ +block+" is received from " + nodeID.getName());
+ }
+
+ blockManager.addBlock(node, block, delHint);
} finally {
writeUnlock();
}
@@ -3576,75 +3750,74 @@ public class FSNamesystem implements FSC
}
private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
- DatanodeReportType type) {
- boolean listLiveNodes = type == DatanodeReportType.ALL ||
- type == DatanodeReportType.LIVE;
- boolean listDeadNodes = type == DatanodeReportType.ALL ||
- type == DatanodeReportType.DEAD;
-
- HashMap<String, String> mustList = new HashMap<String, String>();
-
+ DatanodeReportType type) {
readLock();
- try {
- if (listDeadNodes) {
- //first load all the nodes listed in include and exclude files.
- for (Iterator<String> it = hostsReader.getHosts().iterator();
- it.hasNext();) {
- mustList.put(it.next(), "");
+ try {
+ boolean listLiveNodes = type == DatanodeReportType.ALL ||
+ type == DatanodeReportType.LIVE;
+ boolean listDeadNodes = type == DatanodeReportType.ALL ||
+ type == DatanodeReportType.DEAD;
+
+ HashMap<String, String> mustList = new HashMap<String, String>();
+
+ if (listDeadNodes) {
+ //first load all the nodes listed in include and exclude files.
+ Iterator<String> it = hostsReader.getHosts().iterator();
+ while (it.hasNext()) {
+ mustList.put(it.next(), "");
+ }
+ it = hostsReader.getExcludedHosts().iterator();
+ while (it.hasNext()) {
+ mustList.put(it.next(), "");
+ }
}
- for (Iterator<String> it = hostsReader.getExcludedHosts().iterator();
- it.hasNext();) {
- mustList.put(it.next(), "");
+
+ ArrayList<DatanodeDescriptor> nodes = null;
+
+ synchronized (datanodeMap) {
+ nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
+ mustList.size());
+ Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+ while (it.hasNext()) {
+ DatanodeDescriptor dn = it.next();
+ boolean isDead = isDatanodeDead(dn);
+ if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+ nodes.add(dn);
+ }
+ //Remove any form of the this datanode in include/exclude lists.
+ mustList.remove(dn.getName());
+ mustList.remove(dn.getHost());
+ mustList.remove(dn.getHostName());
+ }
}
- }
-
- ArrayList<DatanodeDescriptor> nodes = null;
-
- synchronized (datanodeMap) {
- nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
- mustList.size());
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- boolean isDead = isDatanodeDead(dn);
- if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+ if (listDeadNodes) {
+ Iterator<String> it = mustList.keySet().iterator();
+ while (it.hasNext()) {
+ DatanodeDescriptor dn =
+ new DatanodeDescriptor(new DatanodeID(it.next()));
+ dn.setLastUpdate(0);
nodes.add(dn);
}
- //Remove any form of the this datanode in include/exclude lists.
- mustList.remove(dn.getName());
- mustList.remove(dn.getHost());
- mustList.remove(dn.getHostName());
}
- }
-
- if (listDeadNodes) {
- for (Iterator<String> it = mustList.keySet().iterator(); it.hasNext();) {
- DatanodeDescriptor dn =
- new DatanodeDescriptor(new DatanodeID(it.next()));
- dn.setLastUpdate(0);
- nodes.add(dn);
- }
- }
-
- return nodes;
+ return nodes;
} finally {
readUnlock();
}
}
- public DatanodeInfo[] datanodeReport( DatanodeReportType type
- ) throws AccessControlException {
+ public DatanodeInfo[] datanodeReport( DatanodeReportType type)
+ throws AccessControlException {
readLock();
try {
- checkSuperuserPrivilege();
-
- ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
- DatanodeInfo[] arr = new DatanodeInfo[results.size()];
- for (int i=0; i<arr.length; i++) {
- arr[i] = new DatanodeInfo(results.get(i));
- }
- return arr;
+ checkSuperuserPrivilege();
+
+ ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
+ DatanodeInfo[] arr = new DatanodeInfo[results.size()];
+ for (int i=0; i<arr.length; i++) {
+ arr[i] = new DatanodeInfo(results.get(i));
+ }
+ return arr;
} finally {
readUnlock();
}
[... 863 lines stripped ...]