You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC
svn commit: r885143 [12/18] - in /hadoop/hdfs/branches/HDFS-326: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/
src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs...
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Sat Nov 28 20:05:56 2009
@@ -17,22 +17,35 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import java.io.*;
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
/*************************************************
* FSDirectory stores the filesystem directory state.
@@ -54,7 +67,8 @@
/** Access an existing dfs name directory. */
FSDirectory(FSNamesystem ns, Configuration conf) {
this(new FSImage(), ns, conf);
- if(conf.getBoolean("dfs.name.dir.restore", false)) {
+ if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
+ DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
NameNode.LOG.info("set FSImage.restoreFailedStorage");
fsImage.setRestoreFailedStorage(true);
}
@@ -82,7 +96,7 @@
private void initialize(Configuration conf) {
MetricsContext metricsContext = MetricsUtil.getContext("dfs");
directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
- directoryMetrics.setTag("sessionId", conf.get("session.id"));
+ directoryMetrics.setTag("sessionId", conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY));
}
void loadFSImage(Collection<URI> dataDirs,
@@ -184,7 +198,7 @@
*/
INode unprotectedAddFile( String path,
PermissionStatus permissions,
- Block[] blocks,
+ BlockInfo[] blocks,
short replication,
long modificationTime,
long atime,
@@ -254,7 +268,8 @@
// Add file->block mapping
INodeFile newF = (INodeFile)newNode;
for (int i = 0; i < nrBlocks; i++) {
- newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
+ BlockInfo blockInfo = new BlockInfo(blocks[i], newF.getReplication());
+ newF.setBlock(i, getBlockManager().addINode(blockInfo, newF));
}
}
}
@@ -264,27 +279,39 @@
/**
* Add a block to the file. Returns a reference to the added block.
*/
- Block addBlock(String path, INode[] inodes, Block block
- ) throws QuotaExceededException {
+ BlockInfo addBlock(String path,
+ INode[] inodes,
+ Block block,
+ DatanodeDescriptor targets[]
+ ) throws QuotaExceededException, IOException {
waitForReady();
synchronized (rootDir) {
- INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
+ assert inodes[inodes.length-1].isUnderConstruction() :
+ "INode should correspond to a file under construction";
+ INodeFileUnderConstruction fileINode =
+ (INodeFileUnderConstruction)inodes[inodes.length-1];
// check quota limits and updated space consumed
- updateCount(inodes, inodes.length-1, 0,
- fileNode.getPreferredBlockSize()*fileNode.getReplication());
-
- // associate the new list of blocks with this file
- BlockInfo blockInfo = getBlockManager().addINode(block, fileNode);
- fileNode.addBlock(blockInfo);
+ updateCount(inodes, inodes.length-1, 0,
+ fileINode.getPreferredBlockSize()*fileINode.getReplication(), true);
+
+ // associate new last block for the file
+ BlockInfoUnderConstruction blockInfo =
+ new BlockInfoUnderConstruction(
+ block,
+ fileINode.getReplication(),
+ BlockUCState.UNDER_CONSTRUCTION,
+ targets);
+ getBlockManager().addINode(blockInfo, fileINode);
+ fileINode.addBlock(blockInfo);
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+ path + " with " + block
+ " block is added to the in-memory "
+ "file system");
+ return blockInfo;
}
- return block;
}
/**
@@ -328,7 +355,7 @@
synchronized (rootDir) {
// modify file-> block and blocksMap
- fileNode.removeBlock(block);
+ fileNode.removeLastBlock(block);
getBlockManager().removeBlockFromMap(block);
// If block is removed from blocksMap remove it from corruptReplicasMap
getBlockManager().removeFromCorruptReplicasMap(block);
@@ -344,7 +371,9 @@
/**
* @see #unprotectedRenameTo(String, String, long)
+ * @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
*/
+ @Deprecated
boolean renameTo(String src, String dst) throws QuotaExceededException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
@@ -358,24 +387,44 @@
return true;
}
- /** Change a path name
+ /**
+ * @see #unprotectedRenameTo(String, String, long, Options.Rename...)
+ */
+ void renameTo(String src, String dst, Options.Rename... options)
+ throws IOException {
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src
+ + " to " + dst);
+ }
+ waitForReady();
+ long now = FSNamesystem.now();
+ unprotectedRenameTo(src, dst, now, options);
+ fsImage.getEditLog().logRename(src, dst, now, options);
+ }
+
+ /**
+ * Change a path name
*
* @param src source path
* @param dst destination path
* @return true if rename succeeds; false otherwise
* @throws QuotaExceededException if the operation violates any quota limit
+ * @deprecated See {@link #renameTo(String, String)}
*/
- boolean unprotectedRenameTo(String src, String dst, long timestamp)
- throws QuotaExceededException {
+ @Deprecated
+ boolean unprotectedRenameTo(String src, String dst, long timestamp)
+ throws QuotaExceededException {
synchronized (rootDir) {
INode[] srcInodes = rootDir.getExistingPathINodes(src);
// check the validation of the source
if (srcInodes[srcInodes.length-1] == null) {
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- +"failed to rename "+src+" to "+dst+ " because source does not exist");
+ + "failed to rename " + src + " to " + dst
+ + " because source does not exist");
return false;
- } else if (srcInodes.length == 1) {
+ }
+ if (srcInodes.length == 1) {
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+"failed to rename "+src+" to "+dst+ " because source is the root");
return false;
@@ -384,71 +433,243 @@
dst += Path.SEPARATOR + new Path(src).getName();
}
- // remove source
- INode srcChild = null;
- try {
- srcChild = removeChild(srcInodes, srcInodes.length-1);
- } catch (IOException e) {
- // srcChild == null; go to next if statement
+ // check the validity of the destination
+ if (dst.equals(src)) {
+ return true;
}
- if (srcChild == null) {
+ // dst cannot be directory or a file under src
+ if (dst.startsWith(src) &&
+ dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- +"failed to rename "+src+" to "+dst+ " because the source can not be removed");
+ + "failed to rename " + src + " to " + dst
+ + " because destination starts with src");
return false;
}
-
- String srcChildName = srcChild.getLocalName();
- // check the validity of the destination
- INode dstChild = null;
- QuotaExceededException failureByQuota = null;
-
byte[][] dstComponents = INode.getPathComponents(dst);
INode[] dstInodes = new INode[dstComponents.length];
rootDir.getExistingPathINodes(dstComponents, dstInodes);
- if (dstInodes[dstInodes.length-1] != null) { //check if destination exists
+ if (dstInodes[dstInodes.length-1] != null) {
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+"failed to rename "+src+" to "+dst+
" because destination exists");
- } else if (dstInodes[dstInodes.length-2] == null) { // check if its parent exists
+ return false;
+ }
+ if (dstInodes[dstInodes.length-2] == null) {
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+"failed to rename "+src+" to "+dst+
- " because destination's parent does not exists");
+ " because destination's parent does not exist");
+ return false;
}
- else {
- // add to the destination
+
+ // Ensure dst has quota to accommodate rename
+ verifyQuotaForRename(srcInodes,dstInodes);
+
+ INode dstChild = null;
+ INode srcChild = null;
+ String srcChildName = null;
+ try {
+ // remove src
+ srcChild = removeChild(srcInodes, srcInodes.length-1);
+ if (srcChild == null) {
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + "failed to rename " + src + " to " + dst
+ + " because the source can not be removed");
+ return false;
+ }
+ srcChildName = srcChild.getLocalName();
srcChild.setLocalName(dstComponents[dstInodes.length-1]);
- try {
- // add it to the namespace
- dstChild = addChild(dstInodes, dstInodes.length-1, srcChild, false);
- } catch (QuotaExceededException qe) {
- failureByQuota = qe;
+
+ // add src to the destination
+ dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+ srcChild, -1, false);
+ if (dstChild != null) {
+ srcChild = null;
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
+ + src + " is renamed to " + dst);
+ }
+ // update modification time of dst and the parent of src
+ srcInodes[srcInodes.length-2].setModificationTime(timestamp);
+ dstInodes[dstInodes.length-2].setModificationTime(timestamp);
+ return true;
+ }
+ } finally {
+ if (dstChild == null && srcChild != null) {
+ // put it back
+ srcChild.setLocalName(srcChildName);
+ addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, -1,
+ false);
}
}
- if (dstChild != null) {
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
- +src+" is renamed to "+dst);
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ +"failed to rename "+src+" to "+dst);
+ return false;
+ }
+ }
+
+ /**
+ * Rename src to dst.
+ * See {@link DistributedFileSystem#rename(Path, Path, Options.Rename...)}
+ * for details related to rename semantics.
+ *
+ * @param src source path
+ * @param dst destination path
+ * @param timestamp modification time
+ * @param options Rename options
+ * @throws IOException if the operation violates any quota limit
+ */
+ void unprotectedRenameTo(String src, String dst, long timestamp,
+ Options.Rename... options) throws IOException {
+ boolean overwrite = false;
+ if (null != options) {
+ for (Rename option : options) {
+ if (option == Rename.OVERWRITE) {
+ overwrite = true;
}
+ }
+ }
+ String error = null;
+ synchronized (rootDir) {
+ final INode[] srcInodes = rootDir.getExistingPathINodes(src);
+ final INode srcInode = srcInodes[srcInodes.length - 1];
+ // validate source
+ if (srcInode == null) {
+ error = "rename source " + src + " is not found.";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new FileNotFoundException(error);
+ }
+ if (srcInodes.length == 1) {
+ error = "rename source cannot be the root";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
- // update modification time of dst and the parent of src
- srcInodes[srcInodes.length-2].setModificationTime(timestamp);
- dstInodes[dstInodes.length-2].setModificationTime(timestamp);
- return true;
- } else {
+ // validate of the destination
+ if (dst.equals(src)) {
+ return;
+ }
+ // dst cannot be a directory or a file under src
+ if (dst.startsWith(src) &&
+ dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+ error = "Rename destination " + dst
+ + " is a directory or file under source " + src;
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
- +"failed to rename "+src+" to "+dst);
- try {
- // put it back
- srcChild.setLocalName(srcChildName);
- addChild(srcInodes, srcInodes.length-1, srcChild, false);
- } catch (IOException ignored) {}
- if (failureByQuota != null) {
- throw failureByQuota;
- } else {
- return false;
+ + error);
+ throw new IOException(error);
+ }
+ final byte[][] dstComponents = INode.getPathComponents(dst);
+ final INode[] dstInodes = new INode[dstComponents.length];
+ rootDir.getExistingPathINodes(dstComponents, dstInodes);
+ INode dstInode = dstInodes[dstInodes.length - 1];
+ if (dstInodes.length == 1) {
+ error = "rename destination cannot be the root";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
+ if (dstInode != null) { // Destination exists
+ if (dstInode.isDirectory() != srcInode.isDirectory()) {
+ error = "Source " + src + " Destination " + dst
+ + " both should be either file or directory";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
+ if (!overwrite) { // If destination exists, overwrite flag must be true
+ error = "rename destination " + dst + " already exists";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new FileAlreadyExistsException(error);
+ }
+ List<INode> children = dstInode.isDirectory() ?
+ ((INodeDirectory) dstInode).getChildrenRaw() : null;
+ if (children != null && children.size() != 0) {
+ error = "rename cannot overwrite non empty destination directory "
+ + dst;
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
+ }
+ if (dstInodes[dstInodes.length - 2] == null) {
+ error = "rename destination parent " + dst + " not found.";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new FileNotFoundException(error);
+ }
+ if (!dstInodes[dstInodes.length - 2].isDirectory()) {
+ error = "rename destination parent " + dst + " is a file.";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new ParentNotDirectoryException(error);
+ }
+
+ // Ensure dst has quota to accommodate rename
+ verifyQuotaForRename(srcInodes, dstInodes);
+ INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
+ if (removedSrc == null) {
+ error = "Failed to rename " + src + " to " + dst
+ + " because the source can not be removed";
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + error);
+ throw new IOException(error);
+ }
+ final String srcChildName = removedSrc.getLocalName();
+ String dstChildName = null;
+ INode removedDst = null;
+ try {
+ if (dstInode != null) { // dst exists remove it
+ removedDst = removeChild(dstInodes, dstInodes.length - 1);
+ dstChildName = removedDst.getLocalName();
+ }
+
+ INode dstChild = null;
+ removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
+ // add src as dst to complete rename
+ dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+ removedSrc, -1, false);
+
+ if (dstChild != null) {
+ removedSrc = null;
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog
+ .debug("DIR* FSDirectory.unprotectedRenameTo: " + src
+ + " is renamed to " + dst);
+ }
+ srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
+ dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
+
+ // Collect the blocks and remove the lease for previous dst
+ if (removedDst != null) {
+ INode rmdst = removedDst;
+ removedDst = null;
+ List<Block> collectedBlocks = new ArrayList<Block>();
+ int filecount = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
+ incrDeletedFileCount(filecount);
+ getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
+ }
+ return;
+ }
+ } finally {
+ if (removedSrc != null) {
+ // Rename failed - restore src
+ removedSrc.setLocalName(srcChildName);
+ addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, -1,
+ false);
+ }
+ if (removedDst != null) {
+ // Rename failed - restore dst
+ removedDst.setLocalName(dstChildName);
+ addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, -1,
+ false);
}
}
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ + "failed to rename " + src + " to " + dst);
+ throw new IOException("rename from " + src + " to " + dst + " failed.");
}
}
@@ -493,7 +714,7 @@
// check disk quota
long dsDelta = (replication - oldReplication[0]) *
(fileNode.diskspaceConsumed()/oldReplication[0]);
- updateCount(inodes, inodes.length-1, 0, dsDelta);
+ updateCount(inodes, inodes.length-1, 0, dsDelta, true);
fileNode.setReplication(replication);
fileBlocks = fileNode.getBlocks();
@@ -568,21 +789,92 @@
}
}
}
+
+ /**
+ *
+ * @param target
+ * @param srcs
+ * @throws IOException
+ */
+ public void concatInternal(String target, String [] srcs) throws IOException{
+ synchronized(rootDir) {
+ // actual move
+ waitForReady();
+
+ unprotectedConcat(target, srcs);
+ // do the commit
+ fsImage.getEditLog().logConcat(target, srcs, FSNamesystem.now());
+ }
+ }
+
+
+
+ /**
+ * Concat all the blocks from srcs to trg
+ * and delete the srcs files
+ * @param trg target file to move the blocks to
+ * @param srcs list of file to move the blocks from
+ * Must be public because also called from EditLogs
+ * NOTE: - it does not update quota (not needed for concat)
+ */
+ public void unprotectedConcat(String target, String [] srcs) throws IOException {
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+ }
+ // do the move
+
+ INode [] trgINodes = getExistingPathINodes(target);
+ INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
+ INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
+
+ INodeFile [] allSrcInodes = new INodeFile[srcs.length];
+ int i = 0;
+ int totalBlocks = 0;
+ for(String src : srcs) {
+ INodeFile srcInode = getFileINode(src);
+ allSrcInodes[i++] = srcInode;
+ totalBlocks += srcInode.blocks.length;
+ }
+ trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks
+ // since we are in the same dir - we can use same parent to remove files
+ int count = 0;
+ for(INodeFile nodeToRemove: allSrcInodes) {
+ if(nodeToRemove == null) continue;
+
+ nodeToRemove.blocks = null;
+ trgParent.removeChild(nodeToRemove);
+ count++;
+ }
+
+ long now = FSNamesystem.now();
+ trgInode.setModificationTime(now);
+ trgParent.setModificationTime(now);
+ // update quota on the parent directory ('count' files removed, 0 space)
+ unprotectedUpdateCount(trgINodes, trgINodes.length-1, - count, 0);
+ }
+
/**
- * Remove the file from management, return blocks
+ * Delete the target directory and collect the blocks under it
+ *
+ * @param src Path of a directory to delete
+ * @param collectedBlocks Blocks under the deleted directory
+ * @return true on successful deletion; else false
*/
- INode delete(String src) {
+ boolean delete(String src, List<Block>collectedBlocks) {
if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "+src);
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
}
waitForReady();
long now = FSNamesystem.now();
- INode deletedNode = unprotectedDelete(src, now);
- if (deletedNode != null) {
- fsImage.getEditLog().logDelete(src, now);
+ INode removedNode = unprotectedDelete(src, collectedBlocks, now);
+ if (removedNode == null) {
+ return false;
}
- return deletedNode;
+ // Blocks will be deleted later by the caller of this method
+ getFSNamesystem().removePathAndBlocks(src, null);
+ fsImage.getEditLog().logDelete(src, now);
+ return true;
}
/** Return if a directory is empty or not **/
@@ -608,12 +900,30 @@
/**
* Delete a path from the name space
* Update the count at each ancestor directory with quota
+ * <br>
+ * Note: This is to be used by {@link FSEditLog} only.
+ * <br>
+ * @param src a string representation of a path to an inode
+ * @param mtime the time the inode is removed
+ * @return deleted inode if deletion succeeds; else null
+ */
+ INode unprotectedDelete(String src, long mtime) {
+ List<Block> collectedBlocks = new ArrayList<Block>();
+ INode removedNode = unprotectedDelete(src, collectedBlocks, mtime);
+ getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
+ return removedNode;
+ }
+
+ /**
+ * Delete a path from the name space
+ * Update the count at each ancestor directory with quota
* @param src a string representation of a path to an inode
- * @param modificationTime the time the inode is removed
- * @param deletedBlocks the place holder for the blocks to be removed
- * @return if the deletion succeeds
+ * @param collectedBlocks blocks collected from the deleted path
+ * @param mtime the time the inode is removed
+ * @return deleted inode if deletion succeeds; else null
*/
- INode unprotectedDelete(String src, long modificationTime) {
+ INode unprotectedDelete(String src, List<Block> collectedBlocks,
+ long mtime) {
src = normalizePath(src);
synchronized (rootDir) {
@@ -624,33 +934,28 @@
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+"failed to remove "+src+" because it does not exist");
return null;
- } else if (inodes.length == 1) { // src is the root
+ }
+ if (inodes.length == 1) { // src is the root
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
"failed to remove " + src +
" because the root is not allowed to be deleted");
return null;
- } else {
- try {
- // Remove the node from the namespace
- removeChild(inodes, inodes.length-1);
- // set the parent's modification time
- inodes[inodes.length-2].setModificationTime(modificationTime);
- // GC all the blocks underneath the node.
- ArrayList<Block> v = new ArrayList<Block>();
- int filesRemoved = targetNode.collectSubtreeBlocksAndClear(v);
- incrDeletedFileCount(filesRemoved);
- getFSNamesystem().removePathAndBlocks(src, v);
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
- +src+" is removed");
- }
- return targetNode;
- } catch(QuotaExceededException e) {
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
- "failed to remove " + src + " because " + e.getMessage());
- return null;
- }
}
+ int pos = inodes.length - 1;
+ // Remove the node from the namespace
+ targetNode = removeChild(inodes, pos);
+ if (targetNode == null) {
+ return null;
+ }
+ // set the parent's modification time
+ inodes[pos-1].setModificationTime(mtime);
+ int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
+ incrDeletedFileCount(filesRemoved);
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+ +src+" is removed");
+ }
+ return targetNode;
}
}
@@ -699,7 +1004,7 @@
}
int index = 0;
- for (Block b : newnode.getBlocks()) {
+ for (BlockInfo b : newnode.getBlocks()) {
BlockInfo info = getBlockManager().addINode(b, newnode);
newnode.setBlock(index, info); // inode refers to the block in BlocksMap
index++;
@@ -844,7 +1149,7 @@
throw new FileNotFoundException(path +
" does not exist under rootDir.");
}
- updateCount(inodes, len-1, nsDelta, dsDelta);
+ updateCount(inodes, len-1, nsDelta, dsDelta, true);
}
}
@@ -854,10 +1159,11 @@
* @param numOfINodes the number of inodes to update starting from index 0
* @param nsDelta the delta change of namespace
* @param dsDelta the delta change of diskspace
+ * @param checkQuota if true then check if quota is exceeded
* @throws QuotaExceededException if the new count violates any quota limit
*/
private void updateCount(INode[] inodes, int numOfINodes,
- long nsDelta, long dsDelta)
+ long nsDelta, long dsDelta, boolean checkQuota)
throws QuotaExceededException {
if (!ready) {
//still initializing. do not check or update quotas.
@@ -866,28 +1172,45 @@
if (numOfINodes>inodes.length) {
numOfINodes = inodes.length;
}
- // check existing components in the path
- int i=0;
- try {
- for(; i < numOfINodes; i++) {
- if (inodes[i].isQuotaSet()) { // a directory with quota
- INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i];
- node.updateNumItemsInTree(nsDelta, dsDelta);
- }
+ if (checkQuota) {
+ verifyQuota(inodes, numOfINodes, nsDelta, dsDelta, null);
+ }
+ for(int i = 0; i < numOfINodes; i++) {
+ if (inodes[i].isQuotaSet()) { // a directory with quota
+ INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i];
+ node.updateNumItemsInTree(nsDelta, dsDelta);
}
+ }
+ }
+
+ /**
+ * update quota of each inode and check to see if quota is exceeded.
+ * See {@link #updateCount(INode[], int, long, long, boolean)}
+ */
+ private void updateCountNoQuotaCheck(INode[] inodes, int numOfINodes,
+ long nsDelta, long dsDelta) {
+ try {
+ updateCount(inodes, numOfINodes, nsDelta, dsDelta, false);
} catch (QuotaExceededException e) {
- e.setPathName(getFullPathName(inodes, i));
- // undo updates
- for( ; i-- > 0; ) {
- try {
- if (inodes[i].isQuotaSet()) { // a directory with quota
- INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i];
- node.updateNumItemsInTree(-nsDelta, -dsDelta);
- }
- } catch (IOException ingored) {
- }
+ NameNode.LOG.warn("FSDirectory.updateCountNoQuotaCheck - unexpected ", e);
+ }
+ }
+
+ /**
+ * updates quota without verification
+ * callers responsibility is to make sure quota is not exceeded
+ * @param inodes
+ * @param numOfINodes
+ * @param nsDelta
+ * @param dsDelta
+ */
+ void unprotectedUpdateCount(INode[] inodes, int numOfINodes,
+ long nsDelta, long dsDelta) {
+ for(int i=0; i < numOfINodes; i++) {
+ if (inodes[i].isQuotaSet()) { // a directory with quota
+ INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i];
+ node.unprotectedUpdateNumItemsInTree(nsDelta, dsDelta);
}
- throw e;
}
}
@@ -899,6 +1222,23 @@
}
return fullPathName.toString();
}
+
+ /** Return the full path name of the specified inode */
+ static String getFullPathName(INode inode) {
+ // calculate the depth of this inode from root
+ int depth = 0;
+ for (INode i = inode; i != null; i = i.parent) {
+ depth++;
+ }
+ INode[] inodes = new INode[depth];
+
+ // fill up the inodes in the path from this inode to root
+ for (int i = 0; i < depth; i++) {
+ inodes[depth-i-1] = inode;
+ inode = inode.parent;
+ }
+ return getFullPathName(inodes, depth-1);
+ }
/**
* Create a directory
@@ -917,7 +1257,7 @@
*/
boolean mkdirs(String src, PermissionStatus permissions,
boolean inheritPermission, long now)
- throws FileNotFoundException, QuotaExceededException {
+ throws FileAlreadyExistsException, QuotaExceededException {
src = normalizePath(src);
String[] names = INode.getPathNames(src);
byte[][] components = INode.getPathComponents(names);
@@ -932,7 +1272,7 @@
for(; i < inodes.length && inodes[i] != null; i++) {
pathbuilder.append(Path.SEPARATOR + names[i]);
if (!inodes[i].isDirectory()) {
- throw new FileNotFoundException("Parent path is not a directory: "
+ throw new FileAlreadyExistsException("Parent path is not a directory: "
+ pathbuilder);
}
}
@@ -981,7 +1321,7 @@
long timestamp) throws QuotaExceededException {
inodes[pos] = addChild(inodes, pos,
new INodeDirectory(name, permission, timestamp),
- inheritPermission );
+ -1, inheritPermission );
}
/** Add a node child to the namespace. The full path name of the node is src.
@@ -999,48 +1339,142 @@
inheritPermission);
}
}
+
+ /**
+ * Verify quota for adding or moving a new INode with required
+ * namespace and diskspace to a given position.
+ *
+ * @param inodes INodes corresponding to a path
+ * @param pos position where a new INode will be added
+ * @param nsDelta needed namespace
+ * @param dsDelta needed diskspace
+ * @param commonAncestor Last node in inodes array that is a common ancestor
+ * for a INode that is being moved from one location to the other.
+ * Pass null if a node is not being moved.
+ * @throws QuotaExceededException if quota limit is exceeded.
+ */
+ private void verifyQuota(INode[] inodes, int pos, long nsDelta, long dsDelta,
+ INode commonAncestor) throws QuotaExceededException {
+ if (!ready) {
+ // Do not check quota if edits log is still being processed
+ return;
+ }
+ if (nsDelta <= 0 && dsDelta <= 0) {
+ // if quota is being freed or not being consumed
+ return;
+ }
+ if (pos>inodes.length) {
+ pos = inodes.length;
+ }
+ int i = pos - 1;
+ try {
+ // check existing components in the path
+ for(; i >= 0; i--) {
+ if (commonAncestor == inodes[i]) {
+ // Moving an existing node. Stop checking for quota when common
+ // ancestor is reached
+ return;
+ }
+ if (inodes[i].isQuotaSet()) { // a directory with quota
+ INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i];
+ node.verifyQuota(nsDelta, dsDelta);
+ }
+ }
+ } catch (QuotaExceededException e) {
+ e.setPathName(getFullPathName(inodes, i));
+ throw e;
+ }
+ }
- /** Add a node child to the inodes at index pos.
- * Its ancestors are stored at [0, pos-1].
- * QuotaExceededException is thrown if it violates quota limit */
- private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
- boolean inheritPermission) throws QuotaExceededException {
- return addChild(pathComponents, pos, child, -1, inheritPermission);
+ /**
+ * Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
+ * dstInodes[dstInodes.length-1]
+ *
+ * @param srcInodes directory from where node is being moved.
+ * @param dstInodes directory to where node is moved to.
+ * @throws QuotaExceededException if quota limit is exceeded.
+ */
+ private void verifyQuotaForRename(INode[] srcInodes, INode[]dstInodes)
+ throws QuotaExceededException {
+ if (!ready) {
+ // Do not check quota if edits log is still being processed
+ return;
+ }
+ INode srcInode = srcInodes[srcInodes.length - 1];
+ INode commonAncestor = null;
+ for(int i =0;srcInodes[i] == dstInodes[i]; i++) {
+ commonAncestor = srcInodes[i];
+ }
+ INode.DirCounts srcCounts = new INode.DirCounts();
+ srcInode.spaceConsumedInTree(srcCounts);
+ long nsDelta = srcCounts.getNsCount();
+ long dsDelta = srcCounts.getDsCount();
+
+ // Reduce the required quota by dst that is being removed
+ INode dstInode = dstInodes[dstInodes.length - 1];
+ if (dstInode != null) {
+ INode.DirCounts dstCounts = new INode.DirCounts();
+ dstInode.spaceConsumedInTree(dstCounts);
+ nsDelta -= dstCounts.getNsCount();
+ dsDelta -= dstCounts.getDsCount();
+ }
+ verifyQuota(dstInodes, dstInodes.length - 1, nsDelta, dsDelta,
+ commonAncestor);
}
/** Add a node child to the inodes at index pos.
* Its ancestors are stored at [0, pos-1].
* QuotaExceededException is thrown if it violates quota limit */
- private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
- long childDiskspace, boolean inheritPermission) throws QuotaExceededException {
+ private <T extends INode> T addChild(INode[] pathComponents, int pos,
+ T child, long childDiskspace, boolean inheritPermission,
+ boolean checkQuota) throws QuotaExceededException {
INode.DirCounts counts = new INode.DirCounts();
child.spaceConsumedInTree(counts);
if (childDiskspace < 0) {
childDiskspace = counts.getDsCount();
}
- updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace);
+ updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace,
+ checkQuota);
T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
child, inheritPermission);
if (addedNode == null) {
- updateCount(pathComponents, pos,
- -counts.getNsCount(), -childDiskspace);
+ updateCount(pathComponents, pos, -counts.getNsCount(),
+ -childDiskspace, true);
}
return addedNode;
}
+
+ private <T extends INode> T addChild(INode[] pathComponents, int pos,
+ T child, long childDiskspace, boolean inheritPermission)
+ throws QuotaExceededException {
+ return addChild(pathComponents, pos, child, childDiskspace,
+ inheritPermission, true);
+ }
+
+ private <T extends INode> T addChildNoQuotaCheck(INode[] pathComponents,
+ int pos, T child, long childDiskspace, boolean inheritPermission) {
+ T inode = null;
+ try {
+ inode = addChild(pathComponents, pos, child, childDiskspace,
+ inheritPermission, false);
+ } catch (QuotaExceededException e) {
+ NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e);
+ }
+ return inode;
+ }
/** Remove an inode at index pos from the namespace.
* Its ancestors are stored at [0, pos-1].
* Count of each ancestor with quota is also updated.
* Return the removed node; null if the removal fails.
*/
- private INode removeChild(INode[] pathComponents, int pos)
- throws QuotaExceededException {
+ private INode removeChild(INode[] pathComponents, int pos) {
INode removedNode =
((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
if (removedNode != null) {
INode.DirCounts counts = new INode.DirCounts();
removedNode.spaceConsumedInTree(counts);
- updateCount(pathComponents, pos,
+ updateCountNoQuotaCheck(pathComponents, pos,
-counts.getNsCount(), -counts.getDsCount());
}
return removedNode;
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Sat Nov 28 20:05:56 2009
@@ -24,14 +24,19 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -42,12 +47,11 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
-import org.mortbay.log.Log;
/**
* FSEditLog maintains a log of the namespace modifications.
@@ -56,7 +60,7 @@
public class FSEditLog {
public static final byte OP_INVALID = -1;
private static final byte OP_ADD = 0;
- private static final byte OP_RENAME = 1; // rename
+ private static final byte OP_RENAME_OLD = 1; // rename
private static final byte OP_DELETE = 2; // delete
private static final byte OP_MKDIR = 3; // create directory
private static final byte OP_SET_REPLICATION = 4; // set replication
@@ -73,6 +77,9 @@
private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
private static final byte OP_TIMES = 13; // sets mod & access time on a file
private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
+ private static final byte OP_RENAME = 15; // new rename
+ private static final byte OP_CONCAT_DELETE = 16; // concat files.
+
/*
* The following operations are used to control remote edit log streams,
* and not logged into file streams.
@@ -407,6 +414,7 @@
return numEdits;
}
+ @SuppressWarnings("deprecation")
int loadEditRecords(int logVersion, DataInputStream in,
boolean closeOnExit) throws IOException {
FSNamesystem fsNamesys = fsimage.getFSNamesystem();
@@ -416,9 +424,9 @@
String clientMachine = null;
String path = null;
int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
- numOpRename = 0, numOpSetRepl = 0, numOpMkDir = 0,
+ numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
- numOpTimes = 0, numOpOther = 0;
+ numOpTimes = 0, numOpRename = 0, numOpConcatDelete = 0, numOpOther = 0;
try {
while (true) {
long timestamp = 0;
@@ -461,19 +469,9 @@
blockSize = readLong(in);
}
// get blocks
- Block blocks[] = null;
- if (logVersion <= -14) {
- blocks = readBlocks(in);
- } else {
- BlockTwo oldblk = new BlockTwo();
- int num = in.readInt();
- blocks = new Block[num];
- for (int i = 0; i < num; i++) {
- oldblk.readFields(in);
- blocks[i] = new Block(oldblk.blkid, oldblk.len,
- Block.GRANDFATHER_GENERATION_STAMP);
- }
- }
+ boolean isFileUnderConstruction = (opcode == OP_ADD);
+ BlockInfo blocks[] =
+ readBlocks(in, logVersion, isFileUnderConstruction, replication);
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
@@ -521,7 +519,7 @@
path, permissions,
blocks, replication,
mtime, atime, blockSize);
- if (opcode == OP_ADD) {
+ if (isFileUnderConstruction) {
numOpAdd++;
//
// Replace current node with a INodeUnderConstruction.
@@ -538,7 +536,7 @@
clientMachine,
null);
fsDir.replaceNode(path, node, cons);
- fsNamesys.leaseManager.addLease(cons.clientName, path);
+ fsNamesys.leaseManager.addLease(cons.getClientName(), path);
}
break;
}
@@ -549,8 +547,29 @@
fsDir.unprotectedSetReplication(path, replication, null);
break;
}
- case OP_RENAME: {
- numOpRename++;
+ case OP_CONCAT_DELETE: {
+ if (logVersion > -22) {
+ throw new IOException("Unexpected opcode " + opcode
+ + " for version " + logVersion);
+ }
+ numOpConcatDelete++;
+ int length = in.readInt();
+ if (length < 3) { // trg, srcs.., timestam
+ throw new IOException("Incorrect data format. "
+ + "Mkdir operation.");
+ }
+ String trg = FSImage.readString(in);
+ int srcSize = length - 1 - 1; //trg and timestamp
+ String [] srcs = new String [srcSize];
+ for(int i=0; i<srcSize;i++) {
+ srcs[i]= FSImage.readString(in);
+ }
+ timestamp = readLong(in);
+ fsDir.unprotectedConcat(trg, srcs);
+ break;
+ }
+ case OP_RENAME_OLD: {
+ numOpRenameOld++;
int length = in.readInt();
if (length != 3) {
throw new IOException("Incorrect data format. "
@@ -681,6 +700,26 @@
fsDir.unprotectedSetTimes(path, mtime, atime, true);
break;
}
+ case OP_RENAME: {
+ if (logVersion > -21) {
+ throw new IOException("Unexpected opcode " + opcode
+ + " for version " + logVersion);
+ }
+ numOpRename++;
+ int length = in.readInt();
+ if (length != 3) {
+ throw new IOException("Incorrect data format. "
+ + "Mkdir operation.");
+ }
+ String s = FSImage.readString(in);
+ String d = FSImage.readString(in);
+ timestamp = readLong(in);
+ Rename[] options = readRenameOptions(in);
+ FileStatus dinfo = fsDir.getFileInfo(d);
+ fsDir.unprotectedRenameTo(s, d, timestamp, options);
+ fsNamesys.changeLease(s, d, dinfo);
+ break;
+ }
default: {
throw new IOException("Never seen opcode " + opcode);
}
@@ -692,12 +731,15 @@
}
if (FSImage.LOG.isDebugEnabled()) {
FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose
- + " numOpDelete = " + numOpDelete + " numOpRename = " + numOpRename
+ + " numOpDelete = " + numOpDelete
+ + " numOpRenameOld = " + numOpRenameOld
+ " numOpSetRepl = " + numOpSetRepl + " numOpMkDir = " + numOpMkDir
+ " numOpSetPerm = " + numOpSetPerm
+ " numOpSetOwner = " + numOpSetOwner
+ " numOpSetGenStamp = " + numOpSetGenStamp
+ " numOpTimes = " + numOpTimes
+ + " numOpConcatDelete = " + numOpConcatDelete
+ + " numOpRename = " + numOpRename
+ " numOpOther = " + numOpOther);
}
return numEdits;
@@ -737,7 +779,7 @@
ArrayList<EditLogOutputStream> errorStreams = null;
long start = FSNamesystem.now();
for(EditLogOutputStream eStream : editStreams) {
- Log.debug("loggin edits into " + eStream.getName() + " stream");
+ FSImage.LOG.debug("loggin edits into " + eStream.getName() + " stream");
if(!eStream.isOperationSupported(op))
continue;
try {
@@ -942,7 +984,19 @@
new DeprecatedUTF8(src),
new DeprecatedUTF8(dst),
FSEditLog.toLogLong(timestamp)};
- logEdit(OP_RENAME, new ArrayWritable(DeprecatedUTF8.class, info));
+ logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
+ }
+
+ /**
+ * Add rename record to edit log
+ */
+ void logRename(String src, String dst, long timestamp, Options.Rename... options) {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(src),
+ new DeprecatedUTF8(dst),
+ FSEditLog.toLogLong(timestamp)};
+ logEdit(OP_RENAME, new ArrayWritable(DeprecatedUTF8.class, info),
+ toBytesWritable(options));
}
/**
@@ -975,7 +1029,22 @@
DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
}
-
+
+ /**
+ * concat(trg,src..) log
+ */
+ void logConcat(String trg, String [] srcs, long timestamp) {
+ int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
+ int idx = 0;
+ info[idx++] = new DeprecatedUTF8(trg);
+ for(int i=0; i<srcs.length; i++) {
+ info[idx++] = new DeprecatedUTF8(srcs[i]);
+ }
+ info[idx] = FSEditLog.toLogLong(timestamp);
+ logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+ }
+
/**
* Add delete file record to edit log
*/
@@ -1247,12 +1316,27 @@
return Long.parseLong(FSImage.readString(in));
}
- static private Block[] readBlocks(DataInputStream in) throws IOException {
+ static private BlockInfo[] readBlocks(
+ DataInputStream in,
+ int logVersion,
+ boolean isFileUnderConstruction,
+ short replication) throws IOException {
int numBlocks = in.readInt();
- Block[] blocks = new Block[numBlocks];
+ BlockInfo[] blocks = new BlockInfo[numBlocks];
+ Block blk = new Block();
+ BlockTwo oldblk = new BlockTwo();
for (int i = 0; i < numBlocks; i++) {
- blocks[i] = new Block();
- blocks[i].readFields(in);
+ if (logVersion <= -14) {
+ blk.readFields(in);
+ } else {
+ oldblk.readFields(in);
+ blk.set(oldblk.blkid, oldblk.len,
+ GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+ }
+ if(isFileUnderConstruction && i == numBlocks-1)
+ blocks[i] = new BlockInfoUnderConstruction(blk, replication);
+ else
+ blocks[i] = new BlockInfo(blk, replication);
}
return blocks;
}
@@ -1437,4 +1521,25 @@
processIOError(errorStreams, true);
return regAllowed;
}
+
+ static Rename[] readRenameOptions(DataInputStream in) throws IOException {
+ BytesWritable writable = new BytesWritable();
+ writable.readFields(in);
+
+ byte[] bytes = writable.getBytes();
+ Rename[] options = new Rename[bytes.length];
+
+ for (int i = 0; i < bytes.length; i++) {
+ options[i] = Rename.valueOf(bytes[i]);
+ }
+ return options;
+ }
+
+ static BytesWritable toBytesWritable(Options.Rename... options) {
+ byte[] bytes = new byte[options.length];
+ for (int i = 0; i < options.length; i++) {
+ bytes[i] = options[i].value();
+ }
+ return new BytesWritable(bytes);
+ }
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Sat Nov 28 20:05:56 2009
@@ -51,10 +51,12 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -63,6 +65,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Writable;
/**
@@ -372,12 +375,12 @@
if(startOpt == StartupOption.IMPORT
&& (checkpointDirs == null || checkpointDirs.isEmpty()))
throw new IOException("Cannot import image from a checkpoint. "
- + "\"fs.checkpoint.dir\" is not set." );
+ + "\"dfs.namenode.checkpoint.dir\" is not set." );
if(startOpt == StartupOption.IMPORT
&& (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
throw new IOException("Cannot import image from a checkpoint. "
- + "\"fs.checkpoint.edits.dir\" is not set." );
+ + "\"dfs.namenode.checkpoint.dir\" is not set." );
setStorageDirectories(dataDirs, editsDirs);
// 1. For each data directory calculate its state and
@@ -1075,7 +1078,7 @@
blocks[j] = new Block();
if (-14 < imgVersion) {
blocks[j].set(in.readLong(), in.readLong(),
- Block.GRANDFATHER_GENERATION_STAMP);
+ GenerationStamp.GRANDFATHER_GENERATION_STAMP);
} else {
blocks[j].readFields(in);
}
@@ -1403,7 +1406,7 @@
}
INodeFile oldnode = (INodeFile) old;
fsDir.replaceNode(path, oldnode, cons);
- fs.leaseManager.addLease(cons.clientName, path);
+ fs.leaseManager.addLease(cons.getClientName(), path);
}
}
@@ -1419,10 +1422,17 @@
int numBlocks = in.readInt();
BlockInfo[] blocks = new BlockInfo[numBlocks];
Block blk = new Block();
- for (int i = 0; i < numBlocks; i++) {
+ int i = 0;
+ for (; i < numBlocks-1; i++) {
blk.readFields(in);
blocks[i] = new BlockInfo(blk, blockReplication);
}
+ // last block is UNDER_CONSTRUCTION
+ if(numBlocks > 0) {
+ blk.readFields(in);
+ blocks[i] = new BlockInfoUnderConstruction(
+ blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
+ }
PermissionStatus perm = PermissionStatus.read(in);
String clientName = readString(in);
String clientMachine = readString(in);
@@ -1430,7 +1440,7 @@
// These locations are not used at all
int numLocs = in.readInt();
DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
- for (int i = 0; i < numLocs; i++) {
+ for (i = 0; i < numLocs; i++) {
locations[i] = new DatanodeDescriptor();
locations[i].readFields(in);
}
@@ -1893,7 +1903,7 @@
*/
static Collection<URI> getCheckpointDirs(Configuration conf,
String defaultValue) {
- Collection<String> dirNames = conf.getStringCollection("fs.checkpoint.dir");
+ Collection<String> dirNames = conf.getStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY);
if (dirNames.size() == 0 && defaultValue != null) {
dirNames.add(defaultValue);
}
@@ -1919,7 +1929,7 @@
static Collection<URI> getCheckpointEditsDirs(Configuration conf,
String defaultName) {
Collection<String> dirNames =
- conf.getStringCollection("fs.checkpoint.edits.dir");
+ conf.getStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
if (dirNames.size() == 0 && defaultName != null) {
dirNames.add(defaultName);
}