You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC
svn commit: r885143 [13/18] - in /hadoop/hdfs/branches/HDFS-326: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/
src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs...
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Nov 28 20:05:56 2009
@@ -22,15 +22,16 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.AccessTokenHandler;
-import org.apache.hadoop.security.ExportedAccessKeys;
import org.apache.hadoop.security.PermissionChecker;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
@@ -39,6 +40,7 @@
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@@ -52,10 +54,16 @@
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.io.IOUtils;
@@ -89,7 +97,7 @@
* 4) machine --> blocklist (inverted #2)
* 5) LRU cache of updated-heartbeat machines
***************************************************/
-public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterStats {
public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
public static final String AUDIT_FORMAT =
"ugi=%s\t" + // ugi
@@ -123,6 +131,7 @@
public static final Log auditLog = LogFactory.getLog(
FSNamesystem.class.getName() + ".audit");
+ static int BLOCK_DELETION_INCREMENT = 1000;
private boolean isPermissionEnabled;
private UserGroupInformation fsOwner;
private String supergroup;
@@ -199,8 +208,7 @@
private long heartbeatExpireInterval;
//replicationRecheckInterval is how often namenode checks for new replication work
private long replicationRecheckInterval;
- // default block size of a file
- private long defaultBlockSize = 0;
+ private FsServerDefaults serverDefaults;
// allow appending to hdfs files
private boolean supportAppends = true;
@@ -288,7 +296,8 @@
dnthread.start();
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
- conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+ conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ ScriptBasedMapping.class,
DNSToSwitchMapping.class), conf);
/* If the dns to swith mapping supports cache, resolve network
@@ -302,7 +311,7 @@
}
public static Collection<URI> getNamespaceDirs(Configuration conf) {
- return getStorageDirs(conf, "dfs.name.dir");
+ return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
}
public static Collection<URI> getStorageDirs(Configuration conf,
@@ -314,7 +323,7 @@
// but will retain directories specified in hdfs-site.xml
// When importing image from a checkpoint, the name-node can
// start with empty set of storage directories.
- Configuration cE = new Configuration(false);
+ Configuration cE = new HdfsConfiguration(false);
cE.addResource("core-default.xml");
cE.addResource("core-site.xml");
cE.addResource("hdfs-default.xml");
@@ -353,7 +362,7 @@
}
public static Collection<URI> getNamespaceEditsDirs(Configuration conf) {
- return getStorageDirs(conf, "dfs.name.edits.dir");
+ return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
}
/**
@@ -397,34 +406,48 @@
}
LOG.info("fsOwner=" + fsOwner);
- this.supergroup = conf.get("dfs.permissions.supergroup", "supergroup");
- this.isPermissionEnabled = conf.getBoolean("dfs.permissions", true);
+ this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
+ DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
+ this.isPermissionEnabled = conf.getBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
+ DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
LOG.info("supergroup=" + supergroup);
LOG.info("isPermissionEnabled=" + isPermissionEnabled);
- short filePermission = (short)conf.getInt("dfs.upgrade.permission", 00777);
+ short filePermission = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY,
+ DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
this.defaultPermission = PermissionStatus.createImmutable(
fsOwner.getUserName(), supergroup, new FsPermission(filePermission));
long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
this.heartbeatRecheckInterval = conf.getInt(
- "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
10 * heartbeatInterval;
this.replicationRecheckInterval =
- conf.getInt("dfs.replication.interval", 3) * 1000L;
- this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
- this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
+ conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+ this.serverDefaults = new FsServerDefaults(
+ conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
+ conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BYTES_PER_CHECKSUM),
+ conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DEFAULT_WRITE_PACKET_SIZE),
+ (short) conf.getInt("dfs.replication", DEFAULT_REPLICATION_FACTOR),
+ conf.getInt("io.file.buffer.size", DEFAULT_FILE_BUFFER_SIZE));
+ this.maxFsObjects = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit,
20*(int)(heartbeatInterval/1000));
- this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0);
+ this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
this.supportAppends = conf.getBoolean("dfs.support.append", false);
this.isAccessTokenEnabled = conf.getBoolean(
- AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
if (isAccessTokenEnabled) {
this.accessKeyUpdateInterval = conf.getLong(
- AccessTokenHandler.STRING_ACCESS_KEY_UPDATE_INTERVAL, 600) * 60 * 1000L; // 10 hrs
+ DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY,
+ DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT) * 60 * 1000L; // 10 hrs
this.accessTokenLifetime = conf.getLong(
- AccessTokenHandler.STRING_ACCESS_TOKEN_LIFETIME, 600) * 60 * 1000L; // 10 hrs
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY,
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT) * 60 * 1000L; // 10 hrs
}
LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
+ " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
@@ -530,11 +553,22 @@
*/
synchronized void metaSave(String filename) throws IOException {
checkSuperuserPrivilege();
- File file = new File(System.getProperty("hadoop.log.dir"),
- filename);
- PrintWriter out = new PrintWriter(new BufferedWriter(
- new FileWriter(file, true)));
-
+ File file = new File(System.getProperty("hadoop.log.dir"), filename);
+ PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
+ true)));
+
+ long totalInodes = this.dir.totalInodes();
+ long totalBlocks = this.getBlocksTotal();
+
+ ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+ ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+ this.DFSNodesStatus(live, dead);
+
+ String str = totalInodes + " files and directories, " + totalBlocks
+ + " blocks = " + (totalInodes + totalBlocks) + " total";
+ out.println(str);
+ out.println("Live Datanodes: "+live.size());
+ out.println("Dead Datanodes: "+dead.size());
blockManager.metaSave(out);
//
@@ -547,7 +581,11 @@
}
long getDefaultBlockSize() {
- return defaultBlockSize;
+ return serverDefaults.getBlockSize();
+ }
+
+ FsServerDefaults getServerDefaults() {
+ return serverDefaults;
}
long getAccessTimePrecision() {
@@ -595,13 +633,18 @@
}
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
long totalSize = 0;
+ BlockInfo curBlock;
while(totalSize<size && iter.hasNext()) {
- totalSize += addBlock(iter.next(), results);
+ curBlock = iter.next();
+ if(!curBlock.isComplete()) continue;
+ totalSize += addBlock(curBlock, results);
}
if(totalSize<size) {
iter = node.getBlockIterator(); // start from the beginning
for(int i=0; i<startBlock&&totalSize<size; i++) {
- totalSize += addBlock(iter.next(), results);
+ curBlock = iter.next();
+ if(!curBlock.isComplete()) continue;
+ totalSize += addBlock(curBlock, results);
}
}
@@ -684,8 +727,7 @@
/**
* Get block locations within the specified range.
- *
- * @see #getBlockLocations(String, long, long)
+ * @see ClientProtocol#getBlockLocations(String, long, long)
*/
LocatedBlocks getBlockLocations(String clientMachine, String src,
long offset, long length) throws IOException {
@@ -708,18 +750,9 @@
/**
* Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long)
- */
- public LocatedBlocks getBlockLocations(String src, long offset, long length
- ) throws IOException {
- return getBlockLocations(src, offset, length, false);
- }
-
- /**
- * Get block locations within the specified range.
- * @see ClientProtocol#getBlockLocations(String, long, long)
* @throws FileNotFoundException
*/
- public LocatedBlocks getBlockLocations(String src, long offset, long length,
+ LocatedBlocks getBlockLocations(String src, long offset, long length,
boolean doAccessTime) throws IOException {
if (offset < 0) {
throw new IOException("Negative offset is not supported. File: " + src );
@@ -727,11 +760,8 @@
if (length < 0) {
throw new IOException("Negative length is not supported. File: " + src );
}
- INodeFile inode = dir.getFileINode(src);
- if (inode == null)
- throw new FileNotFoundException();
- final LocatedBlocks ret = getBlockLocationsInternal(src, inode,
- offset, length, Integer.MAX_VALUE, doAccessTime);
+ final LocatedBlocks ret = getBlockLocationsInternal(src,
+ offset, length, doAccessTime);
if (auditLog.isInfoEnabled()) {
logAuditEvent(UserGroupInformation.getCurrentUGI(),
Server.getRemoteIp(),
@@ -741,29 +771,196 @@
}
private synchronized LocatedBlocks getBlockLocationsInternal(String src,
- INodeFile inode,
long offset,
long length,
- int nrBlocksToReturn,
boolean doAccessTime
) throws IOException {
- if(inode == null) {
- return null;
- }
+ INodeFile inode = dir.getFileINode(src);
+ if (inode == null)
+ throw new FileNotFoundException();
if (doAccessTime && isAccessTimeSupported()) {
dir.setTimes(src, inode, -1, now(), false);
}
- Block[] blocks = inode.getBlocks();
+ final BlockInfo[] blocks = inode.getBlocks();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
+ }
if (blocks == null) {
return null;
}
+
if (blocks.length == 0) {
- return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
+ return new LocatedBlocks(0, inode.isUnderConstruction(),
+ Collections.<LocatedBlock>emptyList(), null, false);
+ } else {
+ final long n = inode.computeFileSize(false);
+ final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
+ blocks, offset, length, Integer.MAX_VALUE);
+ final BlockInfo last = inode.getLastBlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("last = " + last);
+ }
+
+ if (!last.isComplete()) {
+ return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+ blockManager.getBlockLocation(last, n), false);
+ }
+ else {
+ return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+ blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
+ }
+ }
+ }
+
+ /** Create a LocatedBlock. */
+ LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
+ final long offset, final boolean corrupt) throws IOException {
+ final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
+ if (isAccessTokenEnabled) {
+ lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
+ EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+ }
+ return lb;
+ }
+
+ /**
+ * Moves all the blocks from srcs and appends them to trg
+ * To avoid rollbacks we will verify validitity of ALL of the args
+ * before we start actual move.
+ * @param target
+ * @param srcs
+ * @throws IOException
+ */
+ public void concat(String target, String [] srcs) throws IOException{
+ FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) + " to " + target);
+ // check safe mode
+ if (isInSafeMode()) {
+ throw new SafeModeException("concat: cannot concat " + target, safeMode);
+ }
+
+ // verify args
+ if(target.isEmpty()) {
+ throw new IllegalArgumentException("concat: trg file name is empty");
+ }
+ if(srcs == null || srcs.length == 0) {
+ throw new IllegalArgumentException("concat: srcs list is empty or null");
+ }
+
+ // curretnly we require all the files to be in the same dir
+ String trgParent =
+ target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
+ for(String s : srcs) {
+ String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
+ if(! srcParent.equals(trgParent)) {
+ throw new IllegalArgumentException
+ ("concat: srcs and target shoould be in same dir");
+ }
+ }
+
+ synchronized(this) {
+ // write permission for the target
+ if (isPermissionEnabled) {
+ checkPathAccess(target, FsAction.WRITE);
+
+ // and srcs
+ for(String aSrc: srcs) {
+ checkPathAccess(aSrc, FsAction.READ); // read the file
+ checkParentAccess(aSrc, FsAction.WRITE); // for delete
+ }
+ }
+
+
+ // to make sure no two files are the same
+ Set<INode> si = new HashSet<INode>();
+
+ // we put the following prerequisite for the operation
+ // replication and blocks sizes should be the same for ALL the blocks
+ // check the target
+ INode inode = dir.getFileINode(target);
+
+ if(inode == null) {
+ throw new IllegalArgumentException("concat: trg file doesn't exist");
+ }
+ if(inode.isUnderConstruction()) {
+ throw new IllegalArgumentException("concat: trg file is uner construction");
+ }
+
+ INodeFile trgInode = (INodeFile) inode;
+
+ // per design trg shouldn't be empty and all the blocks same size
+ if(trgInode.blocks.length == 0) {
+ throw new IllegalArgumentException("concat: "+ target + " file is empty");
+ }
+
+ long blockSize = trgInode.preferredBlockSize;
+
+ // check the end block to be full
+ if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
+ throw new IllegalArgumentException(target + " blocks size should be the same");
+ }
+
+ si.add(trgInode);
+ short repl = trgInode.blockReplication;
+
+ // now check the srcs
+ boolean endSrc = false; // final src file doesn't have to have full end block
+ for(int i=0; i<srcs.length; i++) {
+ String src = srcs[i];
+ if(i==srcs.length-1)
+ endSrc=true;
+
+ INodeFile srcInode = dir.getFileINode(src);
+
+ if(src.isEmpty()
+ || srcInode == null
+ || srcInode.isUnderConstruction()
+ || srcInode.blocks.length == 0) {
+ throw new IllegalArgumentException("concat: file " + src +
+ " is invalid or empty or underConstruction");
+ }
+
+ // check replication and blocks size
+ if(repl != srcInode.blockReplication) {
+ throw new IllegalArgumentException(src + " and " + target + " " +
+ "should have same replication: "
+ + repl + " vs. " + srcInode.blockReplication);
+ }
+
+ //boolean endBlock=false;
+ // verify that all the blocks are of the same length as target
+ // should be enough to check the end blocks
+ int idx = srcInode.blocks.length-1;
+ if(endSrc)
+ idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
+ if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
+ throw new IllegalArgumentException("concat: blocks sizes of " +
+ src + " and " + target + " should all be the same");
+ }
+
+ si.add(srcInode);
+ }
+
+ // make sure no two files are the same
+ if(si.size() < srcs.length+1) { // trg + srcs
+ // it means at least two files are the same
+ throw new IllegalArgumentException("at least two files are the same");
+ }
+
+ NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
+ Arrays.toString(srcs) + " to " + target);
+
+ dir.concatInternal(target,srcs);
}
+ getEditLog().logSync();
+
- List<LocatedBlock> results = blockManager.getBlockLocations(blocks,
- offset, length, nrBlocksToReturn);
- return inode.createLocatedBlocks(results);
+ if (auditLog.isInfoEnabled()) {
+ final FileStatus stat = dir.getFileInfo(target);
+ logAuditEvent(UserGroupInformation.getCurrentUGI(),
+ Server.getRemoteIp(),
+ "concat", Arrays.toString(srcs), target, stat);
+ }
+
}
/**
@@ -863,6 +1060,24 @@
return dir.getPreferredBlockSize(filename);
}
+ /*
+ * Verify that parent dir exists
+ */
+ private void verifyParentDir(String src) throws FileAlreadyExistsException,
+ FileNotFoundException {
+ Path parent = new Path(src).getParent();
+ if (parent != null) {
+ INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
+ if (pathINodes[pathINodes.length - 1] == null) {
+ throw new FileNotFoundException("Parent directory doesn't exist: "
+ + parent.toString());
+ } else if (!pathINodes[pathINodes.length - 1].isDirectory()) {
+ throw new FileAlreadyExistsException("Parent path is not a directory: "
+ + parent.toString());
+ }
+ }
+ }
+
/**
* Create a new file entry in the namespace.
*
@@ -873,10 +1088,11 @@
*/
void startFile(String src, PermissionStatus permissions,
String holder, String clientMachine,
- EnumSet<CreateFlag> flag, short replication, long blockSize
+ EnumSet<CreateFlag> flag, boolean createParent,
+ short replication, long blockSize
) throws IOException {
startFileInternal(src, permissions, holder, clientMachine, flag,
- replication, blockSize);
+ createParent, replication, blockSize);
getEditLog().logSync();
if (auditLog.isInfoEnabled()) {
final FileStatus stat = dir.getFileInfo(src);
@@ -891,6 +1107,7 @@
String holder,
String clientMachine,
EnumSet<CreateFlag> flag,
+ boolean createParent,
short replication,
long blockSize
) throws IOException {
@@ -902,6 +1119,7 @@
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine
+ + ", createParent=" + createParent
+ ", replication=" + replication
+ ", overwrite=" + overwrite
+ ", append=" + append);
@@ -928,6 +1146,10 @@
}
}
+ if (!createParent) {
+ verifyParentDir(src);
+ }
+
try {
INode myFile = dir.getFileINode(src);
if (myFile != null && myFile.isUnderConstruction()) {
@@ -936,40 +1158,45 @@
// If the file is under construction , then it must be in our
// leases. Find the appropriate lease record.
//
- Lease lease = leaseManager.getLease(holder);
- //
- // We found the lease for this file. And surprisingly the original
- // holder is trying to recreate this file. This should never occur.
- //
- if (lease != null) {
+ Lease lease = leaseManager.getLeaseByPath(src);
+ if (lease == null) {
throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because current leaseholder is trying to recreate file.");
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because pendingCreates is non-null but no leases found.");
}
//
- // Find the original holder.
+ // We found the lease for this file. And surprisingly the original
+ // holder is trying to recreate this file. This should never occur.
//
- lease = leaseManager.getLease(pendingFile.clientName);
- if (lease == null) {
+ if (lease.getHolder().equals(holder)) {
throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because pendingCreates is non-null but no leases found.");
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because current leaseholder is trying to recreate file.");
}
+ assert lease.getHolder().equals(pendingFile.getClientName()) :
+ "Current lease holder " + lease.getHolder() +
+ " does not match file creator " + pendingFile.getClientName();
//
+ // Current lease holder is different from the requester.
// If the original holder has not renewed in the last SOFTLIMIT
- // period, then start lease recovery.
+ // period, then start lease recovery, otherwise fail.
//
if (lease.expiredSoftLimit()) {
LOG.info("startFile: recover lease " + lease + ", src=" + src);
- internalReleaseLease(lease, src);
- }
- throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- ", because this file is already being created by " +
- pendingFile.getClientName() +
- " on " + pendingFile.getClientMachine());
+ boolean isClosed = internalReleaseLease(lease, src, null);
+ if(!isClosed)
+ throw new RecoveryInProgressException(
+ "Failed to close file " + src +
+ ". Lease recovery is in progress. Try again later.");
+
+ } else
+ throw new AlreadyBeingCreatedException("failed to create file " +
+ src + " for " + holder + " on client " + clientMachine +
+ ", because this file is already being created by " +
+ pendingFile.getClientName() +
+ " on " + pendingFile.getClientMachine());
}
try {
@@ -985,7 +1212,7 @@
else {
//append & create a nonexist file equals to overwrite
this.startFileInternal(src, permissions, holder, clientMachine,
- EnumSet.of(CreateFlag.OVERWRITE), replication, blockSize);
+ EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
return;
}
} else if (myFile.isDirectory()) {
@@ -1022,7 +1249,7 @@
clientMachine,
clientNode);
dir.replaceNode(src, node, cons);
- leaseManager.addLease(cons.clientName, src);
+ leaseManager.addLease(cons.getClientName(), src);
} else {
// Now we can add the name to the filesystem. This file has no
@@ -1038,7 +1265,7 @@
throw new IOException("DIR* NameSystem.startFile: " +
"Unable to add file to namespace.");
}
- leaseManager.addLease(newNode.clientName, src);
+ leaseManager.addLease(newNode.getClientName(), src);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
@@ -1061,7 +1288,7 @@
" Please refer to dfs.support.append configuration parameter.");
}
startFileInternal(src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND),
- (short)blockManager.maxReplication, (long)0);
+ false, (short)blockManager.maxReplication, (long)0);
getEditLog().logSync();
//
@@ -1072,40 +1299,36 @@
LocatedBlock lb = null;
synchronized (this) {
INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
-
- BlockInfo[] blocks = file.getBlocks();
- if (blocks != null && blocks.length > 0) {
- BlockInfo last = blocks[blocks.length-1];
- // this is a redundant search in blocksMap
- // should be resolved by the new implementation of append
- BlockInfo storedBlock = blockManager.getStoredBlock(last);
- assert last == storedBlock : "last block should be in the blocksMap";
- if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
+ BlockInfo lastBlock = file.getLastBlock();
+ if (lastBlock != null) {
+ assert lastBlock == blockManager.getStoredBlock(lastBlock) :
+ "last block of the file is not in blocksMap";
+ if (file.getPreferredBlockSize() > lastBlock.getNumBytes()) {
long fileLength = file.computeContentSummary().getLength();
- DatanodeDescriptor[] targets = blockManager.getNodes(storedBlock);
+ DatanodeDescriptor[] targets = blockManager.getNodes(lastBlock);
// remove the replica locations of this block from the node
for (int i = 0; i < targets.length; i++) {
- targets[i].removeBlock(storedBlock);
+ targets[i].removeBlock(lastBlock);
}
- // set the locations of the last block in the lease record
- file.setLastBlock(storedBlock, targets);
+ // convert last block to under-construction and set its locations
+ blockManager.convertLastBlockToUnderConstruction(file, targets);
- lb = new LocatedBlock(last, targets,
- fileLength-storedBlock.getNumBytes());
+ lb = new LocatedBlock(lastBlock, targets,
+ fileLength-lastBlock.getNumBytes());
if (isAccessTokenEnabled) {
lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
}
// Remove block from replication queue.
- blockManager.updateNeededReplications(last, 0, 0);
+ blockManager.updateNeededReplications(lastBlock, 0, 0);
// remove this block from the list of pending blocks to be deleted.
// This reduces the possibility of triggering HADOOP-1349.
//
for (DatanodeDescriptor dd : targets) {
String datanodeId = dd.getStorageID();
- blockManager.removeFromInvalidates(datanodeId, last);
+ blockManager.removeFromInvalidates(datanodeId, lastBlock);
}
}
}
@@ -1138,8 +1361,17 @@
* are replicated. Will return an empty 2-elt array if we want the
* client to "try again later".
*/
- public LocatedBlock getAdditionalBlock(String src,
- String clientName
+ public LocatedBlock getAdditionalBlock(String src,
+ String clientName,
+ Block previous
+ ) throws IOException {
+ return getAdditionalBlock(src, clientName, previous, null);
+ }
+
+ public LocatedBlock getAdditionalBlock(String src,
+ String clientName,
+ Block previous,
+ HashMap<Node, Node> excludedNodes
) throws IOException {
long fileLength, blockSize;
int replication;
@@ -1159,6 +1391,9 @@
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
+ // commit the last block
+ blockManager.commitLastBlock(pendingFile, previous);
+
//
// If we fail this, bad things happen!
//
@@ -1173,7 +1408,7 @@
// choose targets for the new block to be allocated.
DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
- replication, clientNode, null, blockSize);
+ src, replication, clientNode, excludedNodes, blockSize);
if (targets.length < blockManager.minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
@@ -1195,9 +1430,11 @@
throw new NotReplicatedYetException("Not replicated yet:" + src);
}
+ // complete the penultimate block
+ blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
+
// allocate new block record block locations in INode.
- newBlock = allocateBlock(src, pathINodes);
- pendingFile.setTargets(targets);
+ newBlock = allocateBlock(src, pathINodes, targets);
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
@@ -1277,15 +1514,18 @@
COMPLETE_SUCCESS
}
- public CompleteFileStatus completeFile(String src, String holder) throws IOException {
- CompleteFileStatus status = completeFileInternal(src, holder);
+ public CompleteFileStatus completeFile(String src,
+ String holder,
+ Block last) throws IOException {
+ CompleteFileStatus status = completeFileInternal(src, holder, last);
getEditLog().logSync();
return status;
}
-
- private synchronized CompleteFileStatus completeFileInternal(String src,
- String holder) throws IOException {
+ private synchronized CompleteFileStatus completeFileInternal(
+ String src,
+ String holder,
+ Block last) throws IOException {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
if (isInSafeMode())
throw new SafeModeException("Cannot complete file " + src, safeMode);
@@ -1306,7 +1546,12 @@
("from " + pendingFile.getClientMachine()))
);
return CompleteFileStatus.OPERATION_FAILED;
- } else if (!checkFileProgress(pendingFile, true)) {
+ }
+
+ // commit the last block
+ blockManager.commitLastBlock(pendingFile, last);
+
+ if (!checkFileProgress(pendingFile, true)) {
return CompleteFileStatus.STILL_WAITING;
}
@@ -1339,13 +1584,15 @@
* @param inodes INode representing each of the components of src.
* <code>inodes[inodes.length-1]</code> is the INode for the file.
*/
- private Block allocateBlock(String src, INode[] inodes) throws IOException {
+ private Block allocateBlock(String src,
+ INode[] inodes,
+ DatanodeDescriptor targets[]) throws IOException {
Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);
while(isValidBlock(b)) {
b.setBlockId(FSNamesystem.randBlockId.nextLong());
}
b.setGenerationStamp(getGenerationStamp());
- b = dir.addBlock(src, inodes, b);
+ b = dir.addBlock(src, inodes, b, targets);
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
+src+ ". "+b);
return b;
@@ -1356,13 +1603,16 @@
* replicated. If not, return false. If checkall is true, then check
* all blocks, otherwise check only penultimate block.
*/
- synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
+ synchronized boolean checkFileProgress(INodeFile v, boolean checkall) throws IOException {
if (checkall) {
//
// check all blocks of the file.
//
- for (Block block: v.getBlocks()) {
+ for (BlockInfo block: v.getBlocks()) {
if (!blockManager.checkMinReplication(block)) {
+ LOG.info("BLOCK* NameSystem.checkFileProgress: "
+ + "block " + block + " has not reached minimal replication "
+ + blockManager.minReplication);
return false;
}
}
@@ -1370,8 +1620,11 @@
//
// check the penultimate block of this file
//
- Block b = v.getPenultimateBlock();
+ BlockInfo b = v.getPenultimateBlock();
if (b != null && !blockManager.checkMinReplication(b)) {
+ LOG.info("BLOCK* NameSystem.checkFileProgress: "
+ + "block " + b + " has not reached minimal replication "
+ + blockManager.minReplication);
return false;
}
}
@@ -1401,8 +1654,12 @@
// are made, edit namespace and return to client.
////////////////////////////////////////////////////////////////
- /** Change the indicated filename. */
- public boolean renameTo(String src, String dst) throws IOException {
+ /**
+ * Change the indicated filename.
+ * @deprecated Use {@link #renameTo(String, String, Options.Rename...)} instead.
+ */
+ @Deprecated
+ boolean renameTo(String src, String dst) throws IOException {
boolean status = renameToInternal(src, dst);
getEditLog().logSync();
if (status && auditLog.isInfoEnabled()) {
@@ -1414,6 +1671,8 @@
return status;
}
+ /** @deprecated See {@link #renameTo(String, String)} */
+ @Deprecated
private synchronized boolean renameToInternal(String src, String dst
) throws IOException {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
@@ -1439,7 +1698,46 @@
}
return false;
}
+
+
+ /** Rename src to dst */
+ void renameTo(String src, String dst, Options.Rename... options)
+ throws IOException {
+ renameToInternal(src, dst, options);
+ getEditLog().logSync();
+ if (auditLog.isInfoEnabled()) {
+ StringBuilder cmd = new StringBuilder("rename options=");
+ for (Rename option : options) {
+ cmd.append(option.value()).append(" ");
+ }
+ final FileStatus stat = dir.getFileInfo(dst);
+ logAuditEvent(UserGroupInformation.getCurrentUGI(), Server.getRemoteIp(),
+ cmd.toString(), src, dst, stat);
+ }
+ }
+
+ private synchronized void renameToInternal(String src, String dst,
+ Options.Rename... options) throws IOException {
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ + src + " to " + dst);
+ }
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot rename " + src, safeMode);
+ }
+ if (!DFSUtil.isValidName(dst)) {
+ throw new IOException("Invalid name: " + dst);
+ }
+ if (isPermissionEnabled) {
+ checkParentAccess(src, FsAction.WRITE);
+ checkAncestorAccess(dst, FsAction.WRITE);
+ }
+ FileStatus dinfo = dir.getFileInfo(dst);
+ dir.renameTo(src, dst, options);
+ changeLease(src, dst, dinfo); // update lease with new filename
+ }
+
/**
* Remove the indicated filename from namespace. If the filename
* is a directory (non empty) and recursive is set to false then throw exception.
@@ -1448,8 +1746,10 @@
if ((!recursive) && (!dir.isDirEmpty(src))) {
throw new IOException(src + " is non empty");
}
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+ }
boolean status = deleteInternal(src, true);
- getEditLog().logSync();
if (status && auditLog.isInfoEnabled()) {
logAuditEvent(UserGroupInformation.getCurrentUGI(),
Server.getRemoteIp(),
@@ -1459,25 +1759,68 @@
}
/**
- * Remove the indicated filename from the namespace. This may
- * invalidate some blocks that make up the file.
+ * Remove a file/directory from the namespace.
+ * <p>
+ * For large directories, deletion is incremental. The blocks under
+ * the directory are collected and deleted a small number at a time holding
+ * the {@link FSNamesystem} lock.
+ * <p>
+ * For small directory or file the deletion is done in one shot.
*/
- synchronized boolean deleteInternal(String src,
+ private boolean deleteInternal(String src,
boolean enforcePermission) throws IOException {
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+ boolean deleteNow = false;
+ ArrayList<Block> collectedBlocks = new ArrayList<Block>();
+ synchronized(this) {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot delete " + src, safeMode);
+ }
+ if (enforcePermission && isPermissionEnabled) {
+ checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+ }
+ // Unlink the target directory from directory tree
+ if (!dir.delete(src, collectedBlocks)) {
+ return false;
+ }
+ deleteNow = collectedBlocks.size() <= BLOCK_DELETION_INCREMENT;
+ if (deleteNow) { // Perform small deletes right away
+ removeBlocks(collectedBlocks);
+ }
}
- if (isInSafeMode())
- throw new SafeModeException("Cannot delete " + src, safeMode);
- if (enforcePermission && isPermissionEnabled) {
- checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+ // Log directory deletion to editlog
+ getEditLog().logSync();
+ if (!deleteNow) {
+ removeBlocks(collectedBlocks); // Incremental deletion of blocks
}
-
- return dir.delete(src) != null;
+ collectedBlocks.clear();
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
+ + src +" is removed");
+ }
+ return true;
}
+ /** From the given list, incrementally remove the blocks from blockManager */
+ private void removeBlocks(List<Block> blocks) {
+ int start = 0;
+ int end = 0;
+ while (start < blocks.size()) {
+ end = BLOCK_DELETION_INCREMENT + start;
+ end = end > blocks.size() ? blocks.size() : end;
+ synchronized(this) {
+ for (int i=start; i<end; i++) {
+ blockManager.removeBlock(blocks.get(i));
+ }
+ }
+ start = end;
+ }
+ }
+
void removePathAndBlocks(String src, List<Block> blocks) {
leaseManager.removeLeaseWithPrefixPath(src);
+ if (blocks == null) {
+ return;
+ }
for(Block b : blocks) {
blockManager.removeBlock(b);
}
@@ -1502,9 +1845,9 @@
/**
* Create all the necessary directories
*/
- public boolean mkdirs(String src, PermissionStatus permissions
- ) throws IOException {
- boolean status = mkdirsInternal(src, permissions);
+ public boolean mkdirs(String src, PermissionStatus permissions,
+ boolean createParent) throws IOException {
+ boolean status = mkdirsInternal(src, permissions, createParent);
getEditLog().logSync();
if (status && auditLog.isInfoEnabled()) {
final FileStatus stat = dir.getFileInfo(src);
@@ -1519,7 +1862,7 @@
* Create all the necessary directories
*/
private synchronized boolean mkdirsInternal(String src,
- PermissionStatus permissions) throws IOException {
+ PermissionStatus permissions, boolean createParent) throws IOException {
NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
if (isPermissionEnabled) {
checkTraverse(src);
@@ -1538,6 +1881,10 @@
checkAncestorAccess(src, FsAction.WRITE);
}
+ if (!createParent) {
+ verifyParentDir(src);
+ }
+
// validate that we have enough inodes. This is, at best, a
// heuristic because the mkdirs() operation migth need to
// create multiple inodes.
@@ -1592,20 +1939,31 @@
* Move a file that is being written to be immutable.
* @param src The filename
* @param lease The lease for the client creating the file
- */
- void internalReleaseLease(Lease lease, String src) throws IOException {
+ * @param recoveryLeaseHolder reassign lease to this holder if the last block
+ * needs recovery; keep current holder if null.
+ * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal
+ * replication;<br>
+ * RecoveryInProgressException if lease recovery is in progress.<br>
+ * IOException in case of an error.
+ * @return true if file has been successfully finalized and closed or
+ * false if block recovery has been initiated
+ */
+ boolean internalReleaseLease(
+ Lease lease, String src, String recoveryLeaseHolder)
+ throws AlreadyBeingCreatedException,
+ IOException {
LOG.info("Recovering lease=" + lease + ", src=" + src);
INodeFile iFile = dir.getFileINode(src);
if (iFile == null) {
- final String message = "DIR* NameSystem.internalReleaseCreate: "
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ "attempt to release a create lock on "
+ src + " file does not exist.";
NameNode.stateChangeLog.warn(message);
throw new IOException(message);
}
if (!iFile.isUnderConstruction()) {
- final String message = "DIR* NameSystem.internalReleaseCreate: "
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ "attempt to release a create lock on "
+ src + " but file is already closed.";
NameNode.stateChangeLog.warn(message);
@@ -1613,39 +1971,123 @@
}
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
+ int nrBlocks = pendingFile.numBlocks();
+ BlockInfo[] blocks = pendingFile.getBlocks();
+
+ int nrCompleteBlocks;
+ BlockInfo curBlock = null;
+ for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
+ curBlock = blocks[nrCompleteBlocks];
+ if(!curBlock.isComplete())
+ break;
+ assert blockManager.checkMinReplication(curBlock) :
+ "A COMPLETE block is not minimally replicated in " + src;
+ }
+
+ // If there are no incomplete blocks associated with this file,
+ // then reap lease immediately and close the file.
+ if(nrCompleteBlocks == nrBlocks) {
+ finalizeINodeFileUnderConstruction(src, pendingFile);
+ NameNode.stateChangeLog.warn("BLOCK*"
+ + " internalReleaseLease: All existing blocks are COMPLETE,"
+ + " lease removed, file closed.");
+ return true; // closed!
+ }
+
+ // Only the last and the penultimate blocks may be in non COMPLETE state.
+ // If the penultimate block is not COMPLETE, then it must be COMMITTED.
+ if(nrCompleteBlocks < nrBlocks - 2 ||
+ nrCompleteBlocks == nrBlocks - 2 &&
+ curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ + "attempt to release a create lock on "
+ + src + " but file is already closed.";
+ NameNode.stateChangeLog.warn(message);
+ throw new IOException(message);
+ }
- // Initialize lease recovery for pendingFile. If there are no blocks
- // associated with this file, then reap lease immediately. Otherwise
- // renew the lease and trigger lease recovery.
- if (pendingFile.getTargets() == null ||
- pendingFile.getTargets().length == 0) {
- if (pendingFile.getBlocks().length == 0) {
+ // no we know that the last block is not COMPLETE, and
+ // that the penultimate block if exists is either COMPLETE or COMMITTED
+ BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
+ BlockUCState lastBlockState = lastBlock.getBlockUCState();
+ BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
+ BlockUCState penultimateBlockState = (penultimateBlock == null ?
+ BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
+ assert penultimateBlockState == BlockUCState.COMPLETE ||
+ penultimateBlockState == BlockUCState.COMMITTED :
+ "Unexpected state of penultimate block in " + src;
+
+ switch(lastBlockState) {
+ case COMPLETE:
+ assert false : "Already checked that the last block is incomplete";
+ break;
+ case COMMITTED:
+ // Close file if committed blocks are minimally replicated
+ if(blockManager.checkMinReplication(penultimateBlock) &&
+ blockManager.checkMinReplication(lastBlock)) {
finalizeINodeFileUnderConstruction(src, pendingFile);
NameNode.stateChangeLog.warn("BLOCK*"
- + " internalReleaseLease: No blocks found, lease removed.");
- return;
- }
- // setup the Inode.targets for the last block from the blockManager
- //
- BlockInfo[] blocks = pendingFile.getBlocks();
- BlockInfo last = blocks[blocks.length-1];
- DatanodeDescriptor[] targets = blockManager.getNodes(last);
- pendingFile.setTargets(targets);
+ + " internalReleaseLease: Committed blocks are minimally replicated,"
+ + " lease removed, file closed.");
+ return true; // closed!
+ }
+ // Cannot close file right now, since some blocks
+ // are not yet minimally replicated.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ String message = "DIR* NameSystem.internalReleaseLease: " +
+ "Failed to release lease for file " + src +
+ ". Committed blocks are waiting to be minimally replicated." +
+ " Try again later.";
+ NameNode.stateChangeLog.warn(message);
+ throw new AlreadyBeingCreatedException(message);
+ case UNDER_CONSTRUCTION:
+ case UNDER_RECOVERY:
+ // setup the last block locations from the blockManager if not known
+ if(lastBlock.getNumExpectedLocations() == 0)
+ lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock));
+ // start recovery of the last block for this file
+ long blockRecoveryId = nextGenerationStamp();
+ lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+ lastBlock.initializeBlockRecovery(blockRecoveryId);
+ leaseManager.renewLease(lease);
+ // Cannot close file right now, since the last block requires recovery.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ NameNode.stateChangeLog.warn(
+ "DIR* NameSystem.internalReleaseLease: " +
+ "File " + src + " has not been closed." +
+ " Lease recovery is in progress. " +
+ "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+ break;
}
- // start lease recovery of the last block for this file.
- pendingFile.assignPrimaryDatanode();
- leaseManager.renewLease(lease);
+ return false;
+ }
+
+ Lease reassignLease(Lease lease, String src, String newHolder,
+ INodeFileUnderConstruction pendingFile) {
+ if(newHolder == null)
+ return lease;
+ pendingFile.setClientName(newHolder);
+ return leaseManager.reassignLease(lease, src, newHolder);
}
- private void finalizeINodeFileUnderConstruction(String src,
+
+ private void finalizeINodeFileUnderConstruction(
+ String src,
INodeFileUnderConstruction pendingFile) throws IOException {
- leaseManager.removeLease(pendingFile.clientName, src);
+ leaseManager.removeLease(pendingFile.getClientName(), src);
+
+ // complete the penultimate block
+ blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
// The file is no longer pending.
- // Create permanent INode, update blockmap
+ // Create permanent INode, update blocks
INodeFile newFile = pendingFile.convertToInodeFile();
dir.replaceNode(src, pendingFile, newFile);
+ // complete last block of the file
+ blockManager.completeBlock(newFile, newFile.numBlocks()-1);
// close file and persist block allocations for this file
dir.closeFile(src, newFile);
@@ -1663,30 +2105,35 @@
+ ", closeFile=" + closeFile
+ ", deleteBlock=" + deleteblock
+ ")");
- final BlockInfo oldblockinfo = blockManager.getStoredBlock(lastblock);
- if (oldblockinfo == null) {
+ final BlockInfo storedBlock = blockManager.getStoredBlock(lastblock);
+ if (storedBlock == null) {
throw new IOException("Block (=" + lastblock + ") not found");
}
- INodeFile iFile = oldblockinfo.getINode();
- if (!iFile.isUnderConstruction()) {
+ INodeFile iFile = storedBlock.getINode();
+ if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
throw new IOException("Unexpected block (=" + lastblock
+ ") since the file (=" + iFile.getLocalName()
+ ") is not under construction");
}
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
-
- // Remove old block from blocks map. This always have to be done
- // because the generation stamp of this block is changing.
- blockManager.removeBlockFromMap(oldblockinfo);
+ long recoveryId =
+ ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
+ if(recoveryId != newgenerationstamp) {
+ throw new IOException("The recovery id " + newgenerationstamp
+ + " does not match current recovery id "
+ + recoveryId + " for block " + lastblock);
+ }
+
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
if (deleteblock) {
- pendingFile.removeBlock(lastblock);
+ pendingFile.removeLastBlock(lastblock);
+ blockManager.removeBlockFromMap(storedBlock);
}
else {
- // update last block, construct newblockinfo and add it to the blocks map
- lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
- final BlockInfo newblockinfo = blockManager.addINode(lastblock, pendingFile);
+ // update last block
+ storedBlock.setGenerationStamp(newgenerationstamp);
+ storedBlock.setNumBytes(newlength);
// find the DatanodeDescriptor objects
// There should be no locations in the blockManager till now because the
@@ -1703,13 +2150,11 @@
// Otherwise fsck will report these blocks as MISSING, especially if the
// blocksReceived from Datanodes take a long time to arrive.
for (int i = 0; i < descriptors.length; i++) {
- descriptors[i].addBlock(newblockinfo);
+ descriptors[i].addBlock(storedBlock);
}
- pendingFile.setLastBlock(newblockinfo, null);
- } else {
- // add locations into the INodeUnderConstruction
- pendingFile.setLastBlock(newblockinfo, descriptors);
}
+ // add pipeline locations into the INodeUnderConstruction
+ pendingFile.setLastBlock(storedBlock, descriptors);
}
// If this commit does not want to close the file, persist
@@ -1723,7 +2168,10 @@
LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
return;
}
-
+
+ // commit the last block
+ blockManager.commitLastBlock(pendingFile, storedBlock);
+
//remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile);
getEditLog().logSync();
@@ -2352,8 +2800,10 @@
void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
Block b, short replication,
DatanodeDescriptor addedNode,
- DatanodeDescriptor delNodeHint) {
+ DatanodeDescriptor delNodeHint,
+ BlockPlacementPolicy replicator) {
// first form a rack to datanodes map and
+ INodeFile inode = blockManager.getINode(b);
HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
new HashMap<String, ArrayList<DatanodeDescriptor>>();
for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
@@ -2390,24 +2840,13 @@
boolean firstOne = true;
while (nonExcess.size() - replication > 0) {
DatanodeInfo cur = null;
- long minSpace = Long.MAX_VALUE;
// check if we can del delNodeHint
if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
(priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
cur = delNodeHint;
} else { // regular excessive replica removal
- Iterator<DatanodeDescriptor> iter =
- priSet.isEmpty() ? remains.iterator() : priSet.iterator();
- while( iter.hasNext() ) {
- DatanodeDescriptor node = iter.next();
- long free = node.getRemaining();
-
- if (minSpace > free) {
- minSpace = free;
- cur = node;
- }
- }
+ cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
}
firstOne = false;
@@ -2701,14 +3140,11 @@
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
LOG.info("Start Decommissioning node " + node.getName());
node.startDecommission();
+ node.decommissioningStatus.setStartTime(now());
//
// all the blocks that reside on this node have to be
// replicated.
- Iterator<? extends Block> decommissionBlocks = node.getBlockIterator();
- while(decommissionBlocks.hasNext()) {
- Block block = decommissionBlocks.next();
- blockManager.updateNeededReplications(block, -1, 0);
- }
+ checkDecommissionStateInternal(node);
}
}
@@ -2831,7 +3267,7 @@
// Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
// Update the file names and refresh internal includes and excludes list
if (conf == null)
- conf = new Configuration();
+ conf = new HdfsConfiguration();
hostsReader.updateFileNames(conf.get("dfs.hosts",""),
conf.get("dfs.hosts.exclude", ""));
hostsReader.refresh();
@@ -2984,9 +3420,10 @@
* @param conf configuration
*/
SafeModeInfo(Configuration conf) {
- this.threshold = conf.getFloat("dfs.safemode.threshold.pct", 0.95f);
- this.extension = conf.getInt("dfs.safemode.extension", 0);
- this.safeReplication = conf.getInt("dfs.replication.min", 1);
+ this.threshold = conf.getFloat(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, 0.95f);
+ this.extension = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
+ this.safeReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
this.blockTotal = 0;
this.blockSafe = 0;
}
@@ -3329,7 +3766,7 @@
void setBlockTotal() {
if (safeMode == null)
return;
- safeMode.setBlockTotal((int)getBlocksTotal());
+ safeMode.setBlockTotal((int)getCompleteBlocksTotal());
}
/**
@@ -3340,6 +3777,33 @@
}
/**
+ * Get the total number of COMPLETE blocks in the system.
+ * For safe mode only complete blocks are counted.
+ */
+ long getCompleteBlocksTotal() {
+ // Calculate number of blocks under construction
+ long numUCBlocks = 0;
+ for (Lease lease : leaseManager.getSortedLeases()) {
+ for(String path : lease.getPaths()) {
+ INode node = dir.getFileINode(path);
+ assert node != null : "Found a lease for nonexisting file.";
+ assert node.isUnderConstruction() :
+ "Found a lease for file that is not under construction.";
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+ BlockInfo[] blocks = cons.getBlocks();
+ if(blocks == null)
+ continue;
+ for(BlockInfo b : blocks) {
+ if(!b.isComplete())
+ numUCBlocks++;
+ }
+ }
+ }
+ LOG.info("Number of blocks under construction: " + numUCBlocks);
+ return getBlocksTotal() - numUCBlocks;
+ }
+
+ /**
* Enter safe mode manually.
* @throws IOException
*/
@@ -3658,29 +4122,124 @@
return gs;
}
+ private INodeFileUnderConstruction checkUCBlock(Block block, String clientName)
+ throws IOException {
+ // check safe mode
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot get a new generation stamp and an " +
+ "access token for block " + block, safeMode);
+
+ // check stored block state
+ BlockInfo storedBlock = blockManager.getStoredBlock(block);
+ if (storedBlock == null ||
+ storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
+ throw new IOException(block +
+ " does not exist or is not under Construction" + storedBlock);
+ }
+
+ // check file inode
+ INodeFile file = storedBlock.getINode();
+ if (file==null || !file.isUnderConstruction()) {
+ throw new IOException("The file " + storedBlock +
+ " is belonged to does not exist or it is not under construction.");
+ }
+
+ // check lease
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
+ if (clientName == null || !clientName.equals(pendingFile.getClientName())) {
+ throw new LeaseExpiredException("Lease mismatch: " + block +
+ " is accessed by a non lease holder " + clientName);
+ }
+
+ return pendingFile;
+ }
+
/**
- * Verifies that the block is associated with a file that has a lease.
- * Increments, logs and then returns the stamp
+ * Get a new generation stamp together with an access token for
+ * a block under construction
+ *
+ * This method is called for recovering a failed pipeline or setting up
+ * a pipeline to append to a block.
+ *
+ * @param block a block
+ * @param clientName the name of a client
+ * @return a located block with a new generation stamp and an access token
+ * @throws IOException if any error occurs
+ */
+ synchronized LocatedBlock updateBlockForPipeline(Block block,
+ String clientName) throws IOException {
+ // check vadility of parameters
+ checkUCBlock(block, clientName);
+
+ // get a new generation stamp and an access token
+ block.setGenerationStamp(nextGenerationStamp());
+ LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
+ if (isAccessTokenEnabled) {
+ locatedBlock.setAccessToken(accessTokenHandler.generateToken(
+ block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ }
+ return locatedBlock;
+ }
+
+
+ /**
+ * Update a pipeline for a block under construction
+ *
+ * @param clientName the name of the client
+ * @param oldblock and old block
+ * @param newBlock a new block with a new generation stamp and length
+ * @param newNodes datanodes in the pipeline
+ * @throws IOException if any error occurs
*/
- synchronized long nextGenerationStampForBlock(Block block) throws IOException {
- BlockInfo storedBlock = blockManager.getStoredBlock(block);
- if (storedBlock == null) {
- String msg = block + " is already commited, storedBlock == null.";
- LOG.info(msg);
+ synchronized void updatePipeline(String clientName, Block oldBlock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException {
+ assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ + oldBlock + " has different block identifier";
+ LOG.info("updatePipeline(block=" + oldBlock
+ + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+ + ", newLength=" + newBlock.getNumBytes()
+ + ", newNodes=" + Arrays.asList(newNodes)
+ + ", clientName=" + clientName
+ + ")");
+
+ // check the vadility of the block and lease holder name
+ final INodeFileUnderConstruction pendingFile =
+ checkUCBlock(oldBlock, clientName);
+ final BlockInfoUnderConstruction blockinfo = pendingFile.getLastBlock();
+
+ // check new GS & length: this is not expected
+ if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() ||
+ newBlock.getNumBytes() < blockinfo.getNumBytes()) {
+ String msg = "Update " + oldBlock + " (len = " +
+ blockinfo.getNumBytes() + ") to an older state: " + newBlock +
+ " (len = " + newBlock.getNumBytes() +")";
+ LOG.warn(msg);
throw new IOException(msg);
}
- INodeFile fileINode = storedBlock.getINode();
- if (!fileINode.isUnderConstruction()) {
- String msg = block + " is already commited, !fileINode.isUnderConstruction().";
- LOG.info(msg);
- throw new IOException(msg);
+
+ // Update old block with the new generation stamp and new length
+ blockinfo.setGenerationStamp(newBlock.getGenerationStamp());
+ blockinfo.setNumBytes(newBlock.getNumBytes());
+
+ // find the DatanodeDescriptor objects
+ DatanodeDescriptor[] descriptors = null;
+ if (newNodes.length > 0) {
+ descriptors = new DatanodeDescriptor[newNodes.length];
+ for(int i = 0; i < newNodes.length; i++) {
+ descriptors[i] = getDatanode(newNodes[i]);
+ }
}
- if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
- String msg = block + " is beening recovered, ignoring this request.";
- LOG.info(msg);
- throw new IOException(msg);
+ blockinfo.setExpectedLocations(descriptors);
+
+ // persist blocks only if append is supported
+ String src = leaseManager.findPath(pendingFile);
+ if (supportAppends) {
+ dir.persistBlocks(src, pendingFile);
+ getEditLog().logSync();
}
- return nextGenerationStamp();
+ LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
+ return;
}
// rename was successful. If any part of the renamed subtree had
@@ -3783,4 +4342,38 @@
DatanodeDescriptor getDatanode(String nodeID) {
return datanodeMap.get(nodeID);
}
+
+ /**
+ * Return a range of corrupt replica block ids. Up to numExpectedBlocks
+ * blocks starting at the next block after startingBlockId are returned
+ * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
+ * is null, up to numExpectedBlocks blocks are returned from the beginning.
+ * If startingBlockId cannot be found, null is returned.
+ *
+ * @param numExpectedBlocks Number of block ids to return.
+ * 0 <= numExpectedBlocks <= 100
+ * @param startingBlockId Block id from which to start. If null, start at
+ * beginning.
+ * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
+ *
+ */
+ long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
+ Long startingBlockId) {
+ return blockManager.getCorruptReplicaBlockIds(numExpectedBlocks,
+ startingBlockId);
+ }
+
+ public synchronized ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
+ ArrayList<DatanodeDescriptor> decommissioningNodes =
+ new ArrayList<DatanodeDescriptor>();
+ ArrayList<DatanodeDescriptor> results =
+ getDatanodeListForReport(DatanodeReportType.LIVE);
+ for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
+ DatanodeDescriptor node = it.next();
+ if (node.isDecommissionInProgress()) {
+ decommissioningNodes.add(node);
+ }
+ }
+ return decommissioningNodes;
+ }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Sat Nov 28 20:05:56 2009
@@ -35,6 +35,8 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -81,8 +83,8 @@
final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
xml.declaration();
- final Configuration conf = new Configuration(DataNode.getDataNode().getConf());
- final int socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT);
+ final Configuration conf = new HdfsConfiguration(DataNode.getDataNode().getConf());
+ final int socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
UnixUserGroupInformation.saveToConf(conf,
UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
@@ -99,4 +101,4 @@
xml.endDocument();
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Sat Nov 28 20:05:56 2009
@@ -26,6 +26,7 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
@@ -48,7 +49,7 @@
UserGroupInformation.setCurrentUser(ugi);
final ServletContext context = getServletContext();
- final Configuration conf = new Configuration((Configuration) context.getAttribute("name.conf"));
+ final Configuration conf = new HdfsConfiguration((Configuration) context.getAttribute("name.conf"));
UnixUserGroupInformation.saveToConf(conf,
UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Sat Nov 28 20:05:56 2009
@@ -25,15 +25,13 @@
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
/**
* We keep an in-memory representation of the file/block hierarchy.
* This is a base INode class containing common fields for file and
* directory inodes.
*/
-abstract class INode implements Comparable<byte[]> {
+abstract class INode implements Comparable<byte[]>, FSInodeInfo {
protected byte[] name;
protected INodeDirectory parent;
protected long modificationTime;
@@ -247,6 +245,12 @@
}
/** {@inheritDoc} */
+ public String getFullPathName() {
+ // Get the full path name of this inode.
+ return FSDirectory.getFullPathName(this);
+ }
+
+ /** {@inheritDoc} */
public String toString() {
return "\"" + getLocalName() + "\":" + getPermissionStatus();
}
@@ -417,10 +421,4 @@
}
return null;
}
-
-
- LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) {
- return new LocatedBlocks(computeContentSummary().getLength(), blocks,
- isUnderConstruction());
- }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Sat Nov 28 20:05:56 2009
@@ -88,7 +88,7 @@
* @param dsQuota diskspace quota to be set
*
*/
- void setQuota(long newNsQuota, long newDsQuota) throws QuotaExceededException {
+ void setQuota(long newNsQuota, long newDsQuota) {
nsQuota = newNsQuota;
dsQuota = newDsQuota;
}
@@ -116,18 +116,20 @@
*
* @param nsDelta the change of the tree size
* @param dsDelta change to disk space occupied
- * @throws QuotaExceededException if the changed size is greater
- * than the quota
*/
- void updateNumItemsInTree(long nsDelta, long dsDelta) throws
- QuotaExceededException {
- long newCount = nsCount + nsDelta;
- long newDiskspace = diskspace + dsDelta;
- if (nsDelta>0 || dsDelta>0) {
- verifyQuota(nsQuota, newCount, dsQuota, newDiskspace);
- }
- nsCount = newCount;
- diskspace = newDiskspace;
+ void updateNumItemsInTree(long nsDelta, long dsDelta) {
+ nsCount += nsDelta;
+ diskspace += dsDelta;
+ }
+
+ /** Update the size of the tree
+ *
+ * @param nsDelta the change of the tree size
+ * @param dsDelta change to disk space occupied
+ **/
+ void unprotectedUpdateNumItemsInTree(long nsDelta, long dsDelta) {
+ nsCount = nsCount + nsDelta;
+ diskspace = diskspace + dsDelta;
}
/**
@@ -146,14 +148,16 @@
/** Verify if the namespace count disk space satisfies the quota restriction
* @throws QuotaExceededException if the given quota is less than the count
*/
- private static void verifyQuota(long nsQuota, long nsCount,
- long dsQuota, long diskspace)
- throws QuotaExceededException {
- if (nsQuota >= 0 && nsQuota < nsCount) {
- throw new NSQuotaExceededException(nsQuota, nsCount);
- }
- if (dsQuota >= 0 && dsQuota < diskspace) {
- throw new DSQuotaExceededException(dsQuota, diskspace);
+ void verifyQuota(long nsDelta, long dsDelta) throws QuotaExceededException {
+ long newCount = nsCount + nsDelta;
+ long newDiskspace = diskspace + dsDelta;
+ if (nsDelta>0 || dsDelta>0) {
+ if (nsQuota >= 0 && nsQuota < newCount) {
+ throw new NSQuotaExceededException(nsQuota, newCount);
+ }
+ if (dsQuota >= 0 && dsQuota < newDiskspace) {
+ throw new DSQuotaExceededException(dsQuota, newDiskspace);
+ }
}
}
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Sat Nov 28 20:05:56 2009
@@ -88,6 +88,26 @@
}
/**
+ * append array of blocks to this.blocks
+ */
+ void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) {
+ int size = this.blocks.length;
+
+ BlockInfo[] newlist = new BlockInfo[size + totalAddedBlocks];
+ System.arraycopy(this.blocks, 0, newlist, 0, size);
+
+ for(INodeFile in: inodes) {
+ System.arraycopy(in.blocks, 0, newlist, size, in.blocks.length);
+ size += in.blocks.length;
+ }
+
+ for(BlockInfo bi: this.blocks) {
+ bi.setINode(this);
+ }
+ this.blocks = newlist;
+ }
+
+ /**
* add a block to the block list
*/
void addBlock(BlockInfo newblock) {
@@ -112,8 +132,11 @@
int collectSubtreeBlocksAndClear(List<Block> v) {
parent = null;
- for (Block blk : blocks) {
- v.add(blk);
+ if(blocks != null && v != null) {
+ for (BlockInfo blk : blocks) {
+ v.add(blk);
+ blk.setINode(null);
+ }
}
blocks = null;
return 1;
@@ -121,16 +144,29 @@
/** {@inheritDoc} */
long[] computeContentSummary(long[] summary) {
- long bytes = 0;
- for(Block blk : blocks) {
- bytes += blk.getNumBytes();
- }
- summary[0] += bytes;
+ summary[0] += computeFileSize(true);
summary[1]++;
summary[3] += diskspaceConsumed();
return summary;
}
+ /** Compute file size.
+ * May or may not include BlockInfoUnderConstruction.
+ */
+ long computeFileSize(boolean includesBlockInfoUnderConstruction) {
+ if (blocks == null || blocks.length == 0) {
+ return 0;
+ }
+ final int last = blocks.length - 1;
+ //check if the last block is BlockInfoUnderConstruction
+ long bytes = blocks[last] instanceof BlockInfoUnderConstruction
+ && !includesBlockInfoUnderConstruction?
+ 0: blocks[last].getNumBytes();
+ for(int i = 0; i < last; i++) {
+ bytes += blocks[i].getNumBytes();
+ }
+ return bytes;
+ }
@Override
@@ -146,6 +182,9 @@
long diskspaceConsumed(Block[] blkArr) {
long size = 0;
+ if(blkArr == null)
+ return 0;
+
for (Block blk : blkArr) {
if (blk != null) {
size += blk.getNumBytes();
@@ -172,22 +211,33 @@
/**
* Return the penultimate allocated block for this file.
*/
- Block getPenultimateBlock() {
+ BlockInfo getPenultimateBlock() {
if (blocks == null || blocks.length <= 1) {
return null;
}
return blocks[blocks.length - 2];
}
- INodeFileUnderConstruction toINodeFileUnderConstruction(
- String clientName, String clientMachine, DatanodeDescriptor clientNode
- ) throws IOException {
- if (isUnderConstruction()) {
- return (INodeFileUnderConstruction)this;
- }
- return new INodeFileUnderConstruction(name,
- blockReplication, modificationTime, preferredBlockSize,
- blocks, getPermissionStatus(),
- clientName, clientMachine, clientNode);
+ /**
+ * Get the last block of the file.
+ * Make sure it has the right type.
+ */
+ <T extends BlockInfo> T getLastBlock() throws IOException {
+ if (blocks == null || blocks.length == 0)
+ return null;
+ T returnBlock = null;
+ try {
+ @SuppressWarnings("unchecked") // ClassCastException is caught below
+ T tBlock = (T)blocks[blocks.length - 1];
+ returnBlock = tBlock;
+ } catch(ClassCastException cce) {
+ throw new IOException("Unexpected last block type: "
+ + blocks[blocks.length - 1].getClass().getSimpleName());
+ }
+ return returnBlock;
+ }
+
+ int numBlocks() {
+ return blocks == null ? 0 : blocks.length;
}
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Sat Nov 28 20:05:56 2009
@@ -21,16 +21,13 @@
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
class INodeFileUnderConstruction extends INodeFile {
- final String clientName; // lease holder
+ private String clientName; // lease holder
private final String clientMachine;
private final DatanodeDescriptor clientNode; // if client is a cluster node too.
-
- private int primaryNodeIndex = -1; //the node working on lease recovery
- private DatanodeDescriptor[] targets = null; //locations for last block
- private long lastRecoveryTime = 0;
INodeFileUnderConstruction(PermissionStatus permissions,
short replication,
@@ -67,6 +64,10 @@
return clientName;
}
+ void setClientName(String clientName) {
+ this.clientName = clientName;
+ }
+
String getClientMachine() {
return clientMachine;
}
@@ -83,15 +84,6 @@
return true;
}
- DatanodeDescriptor[] getTargets() {
- return targets;
- }
-
- void setTargets(DatanodeDescriptor[] targets) {
- this.targets = targets;
- this.primaryNodeIndex = -1;
- }
-
//
// converts a INodeFileUnderConstruction into a INodeFile
// use the modification time as the access time
@@ -108,10 +100,10 @@
}
/**
- * remove a block from the block list. This block should be
+ * Remove a block from the block list. This block should be
* the last one on the list.
*/
- void removeBlock(Block oldblock) throws IOException {
+ void removeLastBlock(Block oldblock) throws IOException {
if (blocks == null) {
throw new IOException("Trying to delete non-existant block " + oldblock);
}
@@ -124,57 +116,24 @@
BlockInfo[] newlist = new BlockInfo[size_1];
System.arraycopy(blocks, 0, newlist, 0, size_1);
blocks = newlist;
-
- // Remove the block locations for the last block.
- targets = null;
- }
-
- synchronized void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets
- ) throws IOException {
- if (blocks == null) {
- throw new IOException("Trying to update non-existant block (newblock="
- + newblock + ")");
- }
- blocks[blocks.length - 1] = newblock;
- setTargets(newtargets);
- lastRecoveryTime = 0;
}
/**
- * Initialize lease recovery for this object
+ * Convert the last block of the file to an under-construction block.
+ * Set its locations.
*/
- void assignPrimaryDatanode() {
- //assign the first alive datanode as the primary datanode
-
- if (targets.length == 0) {
- NameNode.stateChangeLog.warn("BLOCK*"
- + " INodeFileUnderConstruction.initLeaseRecovery:"
- + " No blocks found, lease removed.");
- }
-
- int previous = primaryNodeIndex;
- //find an alive datanode beginning from previous
- for(int i = 1; i <= targets.length; i++) {
- int j = (previous + i)%targets.length;
- if (targets[j].isAlive) {
- DatanodeDescriptor primary = targets[primaryNodeIndex = j];
- primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
- NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1]
- + " recovery started, primary=" + primary);
- return;
- }
- }
- }
-
- /**
- * Update lastRecoveryTime if expired.
- * @return true if lastRecoveryTimeis updated.
- */
- synchronized boolean setLastRecoveryTime(long now) {
- boolean expired = now - lastRecoveryTime > NameNode.LEASE_RECOVER_PERIOD;
- if (expired) {
- lastRecoveryTime = now;
- }
- return expired;
+ BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
+ DatanodeDescriptor[] targets)
+ throws IOException {
+ if (blocks == null || blocks.length == 0) {
+ throw new IOException("Trying to update non-existant block. " +
+ "File is empty.");
+ }
+ BlockInfoUnderConstruction ucBlock =
+ lastBlock.convertToBlockUnderConstruction(
+ BlockUCState.UNDER_CONSTRUCTION, targets);
+ ucBlock.setINode(this);
+ setBlock(numBlocks()-1, ucBlock);
+ return ucBlock;
}
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Sat Nov 28 20:05:56 2009
@@ -102,7 +102,7 @@
/**
* Adds (or re-adds) the lease for the specified file.
*/
- synchronized void addLease(String holder, String src) {
+ synchronized Lease addLease(String holder, String src) {
Lease lease = getLease(holder);
if (lease == null) {
lease = new Lease(holder);
@@ -113,6 +113,7 @@
}
sortedLeasesByPath.put(src, lease);
lease.paths.add(src);
+ return lease;
}
/**
@@ -143,11 +144,22 @@
}
/**
+ * Reassign lease for file src to the new holder.
+ */
+ synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+ assert newHolder != null : "new lease holder is null";
+ if (lease != null) {
+ removeLease(lease, src);
+ }
+ return addLease(newHolder, src);
+ }
+
+ /**
* Finds the pathname for the specified pendingFile
*/
synchronized String findPath(INodeFileUnderConstruction pendingFile
) throws IOException {
- Lease lease = getLease(pendingFile.clientName);
+ Lease lease = getLease(pendingFile.getClientName());
if (lease != null) {
String src = lease.findPath(pendingFile);
if (src != null) {
@@ -265,7 +277,11 @@
Collection<String> getPaths() {
return paths;
}
-
+
+ String getHolder() {
+ return holder;
+ }
+
void replacePath(String oldpath, String newpath) {
paths.remove(oldpath);
paths.add(newpath);
@@ -376,7 +392,13 @@
oldest.getPaths().toArray(leasePaths);
for(String p : leasePaths) {
try {
- fsnamesystem.internalReleaseLease(oldest, p);
+ if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) {
+ LOG.info("Lease recovery for file " + p +
+ " is complete. File closed.");
+ removing.add(p);
+ } else
+ LOG.info("Started block recovery for file " + p +
+ " lease " + oldest);
} catch (IOException e) {
LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
removing.add(p);