You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/29 01:11:39 UTC
svn commit: r830804 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/java/org/apache/hadoop/hdfs/tools/
src/java/org/apache/hadoop/hdfs/tools/off...
Author: hairong
Date: Thu Oct 29 00:11:37 2009
New Revision: 830804
URL: http://svn.apache.org/viewvc?rev=830804&view=rev
Log:
HDFS-222. Support for concatenating of files into a single file without copying. Contributed by Boris Shkolnik.
Added:
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/HDFSConcat.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Thu Oct 29 00:11:37 2009
@@ -13,6 +13,9 @@
HDFS-654. Add support new atomic rename functionality in HDFS for
supporting rename in FileContext. (suresh)
+ HDFS-222. Support for concatenating of files into a single file
+ without copying. (Boris Shkolnik via hairong)
+
IMPROVEMENTS
HDFS-704. Unify build property names to facilitate cross-projects
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Oct 29 00:11:37 2009
@@ -675,6 +675,20 @@
}
/**
+ * Move blocks from src to trg and delete src
+ * See {@link ClientProtocol#concat(String, String [])}.
+ */
+ public void concat(String trg, String [] srcs) throws IOException {
+ checkOpen();
+ try {
+ namenode.concat(trg, srcs);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ NSQuotaExceededException.class,
+ DSQuotaExceededException.class);
+ }
+ }
+ /**
* Rename file or directory.
* See {@link ClientProtocol#rename(String, String, Options.Rename...)}
*/
@@ -688,7 +702,6 @@
DSQuotaExceededException.class);
}
}
-
/**
* Delete file or directory.
* See {@link ClientProtocol#delete(String)}.
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu Oct 29 00:11:37 2009
@@ -245,6 +245,23 @@
) throws IOException {
return dfs.setReplication(getPathName(src), replication);
}
+
+ /**
+ * THIS IS DFS only operations, it is not part of FileSystem
+ * move blocks from srcs to trg
+ * and delete srcs afterwards
+ * all blocks should be the same size
+ * @param trg existing file to append to
+ * @param psrcs list of files (same block size, same replication)
+ * @throws IOException
+ */
+ public void concat(Path trg, Path [] psrcs) throws IOException {
+ String [] srcs = new String [psrcs.length];
+ for(int i=0; i<psrcs.length; i++) {
+ srcs[i] = getPathName(psrcs[i]);
+ }
+ dfs.concat(getPathName(trg), srcs);
+ }
/** {@inheritDoc} */
@SuppressWarnings("deprecation")
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Oct 29 00:11:37 2009
@@ -46,10 +46,9 @@
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 51: New rename method with support of destination overwrite for the use of
- * {@link FileContext}
+ * 52: adding concat() API
*/
- public static final long versionID = 51L;
+ public static final long versionID = 52L;
///////////////////////////////////////
// File contents
@@ -243,6 +242,17 @@
public boolean rename(String src, String dst) throws IOException;
/**
+ * moves blocks from srcs to trg and delete srcs
+ *
+ * @param trg existing file
+ * @param srcs - list of existing files (same block size, same replication)
+ * @throws IOException if some arguments are invalid
+ * @throws QuotaExceededException if the rename would violate
+ * any quota restriction
+ */
+ public void concat(String trg, String [] srcs) throws IOException;
+
+ /**
* Rename src to dst.
* <ul>
* <li>Fails if src is a file and dst is a directory.
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Thu Oct 29 00:11:37 2009
@@ -91,7 +91,7 @@
// Version is reflected in the data storage file.
// Versions are negative.
// Decrement LAYOUT_VERSION to define a new version.
- public static final int LAYOUT_VERSION = -21;
+ public static final int LAYOUT_VERSION = -22;
// Current version:
- // -21: Added new rename operation to edit log
+ // -22: added new OP_CONCAT_DELETE
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Oct 29 00:11:37 2009
@@ -789,7 +789,71 @@
}
}
}
+
+ /**
+ *
+ * @param target
+ * @param srcs
+ * @throws IOException
+ */
+ public void concatInternal(String target, String [] srcs) throws IOException{
+ synchronized(rootDir) {
+ // actual move
+ waitForReady();
+
+ unprotectedConcat(target, srcs);
+ // do the commit
+ fsImage.getEditLog().logConcat(target, srcs, FSNamesystem.now());
+ }
+ }
+
+
+
+ /**
+ * Concat all the blocks from srcs to trg
+ * and delete the srcs files
+ * @param trg target file to move the blocks to
+ * @param srcs list of file to move the blocks from
+ * Must be public because also called from EditLogs
+ * NOTE: - it does not update quota (not needed for concat)
+ */
+ public void unprotectedConcat(String target, String [] srcs) throws IOException {
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+ }
+ // do the move
+
+ INode [] trgINodes = getExistingPathINodes(target);
+ INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
+ INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
+ INodeFile [] allSrcInodes = new INodeFile[srcs.length];
+ int i = 0;
+ int totalBlocks = 0;
+ for(String src : srcs) {
+ INodeFile srcInode = getFileINode(src);
+ allSrcInodes[i++] = srcInode;
+ totalBlocks += srcInode.blocks.length;
+ }
+ trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks
+
+ // since we are in the same dir - we can use same parent to remove files
+ int count = 0;
+ for(INodeFile nodeToRemove: allSrcInodes) {
+ if(nodeToRemove == null) continue;
+
+ nodeToRemove.blocks = null;
+ trgParent.removeChild(nodeToRemove);
+ count++;
+ }
+
+ long now = FSNamesystem.now();
+ trgInode.setModificationTime(now);
+ trgParent.setModificationTime(now);
+ // update quota on the parent directory ('count' files removed, 0 space)
+ unprotectedUpdateCount(trgINodes, trgINodes.length-1, - count, 0);
+ }
+
/**
* Delete the target directory and collect the blocks under it
*
@@ -1132,6 +1196,24 @@
}
}
+ /**
+ * updates quota without verification
+ * callers responsibility is to make sure quota is not exceeded
+ * @param inodes
+ * @param numOfINodes
+ * @param nsDelta
+ * @param dsDelta
+ */
+ void unprotectedUpdateCount(INode[] inodes, int numOfINodes,
+ long nsDelta, long dsDelta) {
+ for(int i=0; i < numOfINodes; i++) {
+ if (inodes[i].isQuotaSet()) { // a directory with quota
+ INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i];
+ node.unprotectedUpdateNumItemsInTree(nsDelta, dsDelta);
+ }
+ }
+ }
+
/** Return the name of the path represented by inodes at [0, pos] */
private static String getFullPathName(INode[] inodes, int pos) {
StringBuilder fullPathName = new StringBuilder();
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Oct 29 00:11:37 2009
@@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.fs.FileStatus;
@@ -31,6 +32,7 @@
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -45,13 +47,11 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
-import org.mortbay.log.Log;
/**
* FSEditLog maintains a log of the namespace modifications.
@@ -78,6 +78,8 @@
private static final byte OP_TIMES = 13; // sets mod & access time on a file
private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
private static final byte OP_RENAME = 15; // new rename
+ private static final byte OP_CONCAT_DELETE = 16; // concat files.
+
/*
* The following operations are used to control remote edit log streams,
* and not logged into file streams.
@@ -424,7 +426,7 @@
int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
- numOpTimes = 0, numOpRename = 0, numOpOther = 0;
+ numOpTimes = 0, numOpRename = 0, numOpConcatDelete = 0, numOpOther = 0;
try {
while (true) {
long timestamp = 0;
@@ -545,6 +547,27 @@
fsDir.unprotectedSetReplication(path, replication, null);
break;
}
+ case OP_CONCAT_DELETE: {
+ if (logVersion > -22) {
+ throw new IOException("Unexpected opcode " + opcode
+ + " for version " + logVersion);
+ }
+ numOpConcatDelete++;
+ int length = in.readInt();
+ if (length < 3) { // trg, srcs.., timestam
+ throw new IOException("Incorrect data format. "
+ + "Mkdir operation.");
+ }
+ String trg = FSImage.readString(in);
+ int srcSize = length - 1 - 1; //trg and timestamp
+ String [] srcs = new String [srcSize];
+ for(int i=0; i<srcSize;i++) {
+ srcs[i]= FSImage.readString(in);
+ }
+ timestamp = readLong(in);
+ fsDir.unprotectedConcat(trg, srcs);
+ break;
+ }
case OP_RENAME_OLD: {
numOpRenameOld++;
int length = in.readInt();
@@ -715,6 +738,7 @@
+ " numOpSetOwner = " + numOpSetOwner
+ " numOpSetGenStamp = " + numOpSetGenStamp
+ " numOpTimes = " + numOpTimes
+ + " numOpConcatDelete = " + numOpConcatDelete
+ " numOpRename = " + numOpRename
+ " numOpOther = " + numOpOther);
}
@@ -755,7 +779,7 @@
ArrayList<EditLogOutputStream> errorStreams = null;
long start = FSNamesystem.now();
for(EditLogOutputStream eStream : editStreams) {
- Log.debug("loggin edits into " + eStream.getName() + " stream");
+ FSImage.LOG.debug("loggin edits into " + eStream.getName() + " stream");
if(!eStream.isOperationSupported(op))
continue;
try {
@@ -1005,7 +1029,22 @@
DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
}
-
+
+ /**
+ * concat(trg,src..) log
+ */
+ void logConcat(String trg, String [] srcs, long timestamp) {
+ int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
+ int idx = 0;
+ info[idx++] = new DeprecatedUTF8(trg);
+ for(int i=0; i<srcs.length; i++) {
+ info[idx++] = new DeprecatedUTF8(srcs[i]);
+ }
+ info[idx] = FSEditLog.toLogLong(timestamp);
+ logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+ }
+
/**
* Add delete file record to edit log
*/
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Oct 29 00:11:37 2009
@@ -764,6 +764,146 @@
}
return lb;
}
+
+ /**
+ * Moves all the blocks from srcs and appends them to trg
+ * To avoid rollbacks we will verify validitity of ALL of the args
+ * before we start actual move.
+ * @param target
+ * @param srcs
+ * @throws IOException
+ */
+ public void concat(String target, String [] srcs) throws IOException{
+ FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) + " to " + target);
+ // check safe mode
+ if (isInSafeMode()) {
+ throw new SafeModeException("concat: cannot concat " + target, safeMode);
+ }
+
+ // verify args
+ if(target.isEmpty()) {
+ throw new IllegalArgumentException("concat: trg file name is empty");
+ }
+ if(srcs == null || srcs.length == 0) {
+ throw new IllegalArgumentException("concat: srcs list is empty or null");
+ }
+
+ // curretnly we require all the files to be in the same dir
+ String trgParent =
+ target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
+ for(String s : srcs) {
+ String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
+ if(! srcParent.equals(trgParent)) {
+ throw new IllegalArgumentException
+ ("concat: srcs and target shoould be in same dir");
+ }
+ }
+
+ synchronized(this) {
+ // write permission for the target
+ if (isPermissionEnabled) {
+ checkPathAccess(target, FsAction.WRITE);
+
+ // and srcs
+ for(String aSrc: srcs) {
+ checkPathAccess(aSrc, FsAction.READ); // read the file
+ checkParentAccess(aSrc, FsAction.WRITE); // for delete
+ }
+ }
+
+
+ // to make sure no two files are the same
+ Set<INode> si = new HashSet<INode>();
+
+ // we put the following prerequisite for the operation
+ // replication and blocks sizes should be the same for ALL the blocks
+ // check the target
+ INode inode = dir.getFileINode(target);
+
+ if(inode == null) {
+ throw new IllegalArgumentException("concat: trg file doesn't exist");
+ }
+ if(inode.isUnderConstruction()) {
+ throw new IllegalArgumentException("concat: trg file is uner construction");
+ }
+
+ INodeFile trgInode = (INodeFile) inode;
+
+ // per design trg shouldn't be empty and all the blocks same size
+ if(trgInode.blocks.length == 0) {
+ throw new IllegalArgumentException("concat: "+ target + " file is empty");
+ }
+
+ long blockSize = trgInode.preferredBlockSize;
+
+ // check the end block to be full
+ if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
+ throw new IllegalArgumentException(target + " blocks size should be the same");
+ }
+
+ si.add(trgInode);
+ short repl = trgInode.blockReplication;
+
+ // now check the srcs
+ boolean endSrc = false; // final src file doesn't have to have full end block
+ for(int i=0; i<srcs.length; i++) {
+ String src = srcs[i];
+ if(i==srcs.length-1)
+ endSrc=true;
+
+ INodeFile srcInode = dir.getFileINode(src);
+
+ if(src.isEmpty()
+ || srcInode == null
+ || srcInode.isUnderConstruction()
+ || srcInode.blocks.length == 0) {
+ throw new IllegalArgumentException("concat: file " + src +
+ " is invalid or empty or underConstruction");
+ }
+
+ // check replication and blocks size
+ if(repl != srcInode.blockReplication) {
+ throw new IllegalArgumentException(src + " and " + target + " " +
+ "should have same replication: "
+ + repl + " vs. " + srcInode.blockReplication);
+ }
+
+ //boolean endBlock=false;
+ // verify that all the blocks are of the same length as target
+ // should be enough to check the end blocks
+ int idx = srcInode.blocks.length-1;
+ if(endSrc)
+ idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
+ if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
+ throw new IllegalArgumentException("concat: blocks sizes of " +
+ src + " and " + target + " should all be the same");
+ }
+
+ si.add(srcInode);
+ }
+
+ // make sure no two files are the same
+ if(si.size() < srcs.length+1) { // trg + srcs
+ // it means at least two files are the same
+ throw new IllegalArgumentException("at least two files are the same");
+ }
+
+ NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
+ Arrays.toString(srcs) + " to " + target);
+
+ dir.concatInternal(target,srcs);
+ }
+ getEditLog().logSync();
+
+
+ if (auditLog.isInfoEnabled()) {
+ final FileStatus stat = dir.getFileInfo(target);
+ logAuditEvent(UserGroupInformation.getCurrentUGI(),
+ Server.getRemoteIp(),
+ "concat", Arrays.toString(srcs), target, stat);
+ }
+
+ }
/**
* stores the modification and access time for this inode.
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Thu Oct 29 00:11:37 2009
@@ -88,7 +88,7 @@
* @param dsQuota diskspace quota to be set
*
*/
- void setQuota(long newNsQuota, long newDsQuota) throws QuotaExceededException {
+ void setQuota(long newNsQuota, long newDsQuota) {
nsQuota = newNsQuota;
dsQuota = newDsQuota;
}
@@ -122,6 +122,16 @@
diskspace += dsDelta;
}
+ /** Update the size of the tree
+ *
+ * @param nsDelta the change of the tree size
+ * @param dsDelta change to disk space occupied
+ **/
+ void unprotectedUpdateNumItemsInTree(long nsDelta, long dsDelta) {
+ nsCount = nsCount + nsDelta;
+ diskspace = diskspace + dsDelta;
+ }
+
/**
* Sets namespace and diskspace take by the directory rooted
* at this INode. This should be used carefully. It does not check
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Thu Oct 29 00:11:37 2009
@@ -88,6 +88,26 @@
}
/**
+ * append array of blocks to this.blocks
+ */
+ void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) {
+ int size = this.blocks.length;
+
+ BlockInfo[] newlist = new BlockInfo[size + totalAddedBlocks];
+ System.arraycopy(this.blocks, 0, newlist, 0, size);
+
+ for(INodeFile in: inodes) {
+ System.arraycopy(in.blocks, 0, newlist, size, in.blocks.length);
+ size += in.blocks.length;
+ }
+
+ for(BlockInfo bi: this.blocks) {
+ bi.setINode(this);
+ }
+ this.blocks = newlist;
+ }
+
+ /**
* add a block to the block list
*/
void addBlock(BlockInfo newblock) {
@@ -112,9 +132,11 @@
int collectSubtreeBlocksAndClear(List<Block> v) {
parent = null;
- for (BlockInfo blk : blocks) {
- v.add(blk);
- blk.setINode(null);
+ if(blocks != null && v != null) {
+ for (BlockInfo blk : blocks) {
+ v.add(blk);
+ blk.setINode(null);
+ }
}
blocks = null;
return 1;
@@ -160,6 +182,9 @@
long diskspaceConsumed(Block[] blkArr) {
long size = 0;
+ if(blkArr == null)
+ return 0;
+
for (Block blk : blkArr) {
if (blk != null) {
size += blk.getNumBytes();
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Oct 29 00:11:37 2009
@@ -714,7 +714,13 @@
return ret;
}
-
+ /**
+ * {@inheritDoc}
+ */
+ public void concat(String trg, String[] src) throws IOException {
+ namesystem.concat(trg, src);
+ }
+
/** {@inheritDoc} */
@Override
public void rename(String src, String dst, Options.Rename... options) throws IOException {
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/HDFSConcat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/HDFSConcat.java?rev=830804&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/HDFSConcat.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/HDFSConcat.java Thu Oct 29 00:11:37 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+
+public class HDFSConcat {
+ private final static String def_uri = "hdfs://localhost:9000";
+ /**
+ * @param args
+ */
+ public static void main(String... args) throws IOException {
+
+ if(args.length < 2) {
+ System.err.println("Usage HDFSConcat target srcs..");
+ System.exit(0);
+ }
+
+ Configuration conf = new Configuration();
+ String uri = conf.get("fs.default.name", def_uri);
+ Path path = new Path(uri);
+ DistributedFileSystem dfs =
+ (DistributedFileSystem)FileSystem.get(path.toUri(), conf);
+
+ Path [] srcs = new Path[args.length-1];
+ for(int i=1; i<args.length; i++) {
+ srcs[i-1] = new Path(args[i]);
+ }
+ dfs.concat(new Path(args[0]), srcs);
+ }
+
+}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Thu Oct 29 00:11:37 2009
@@ -96,7 +96,7 @@
class ImageLoaderCurrent implements ImageLoader {
protected final DateFormat dateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm");
- private static int [] versions = {-16, -17, -18, -19, -20, -21};
+ private static int [] versions = {-16, -17, -18, -19, -20, -21, -22};
private int imageVersion = 0;
/* (non-Javadoc)
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java Thu Oct 29 00:11:37 2009
@@ -45,7 +45,7 @@
"\n" +
"The oiv utility will attempt to parse correctly formed image files\n" +
"and will abort fail with mal-formed image files. Currently the\n" +
- "supports FSImage layout versions -16 through -19.\n" +
+ "supports FSImage layout versions -16 through -22.\n" +
"\n" +
"The tool works offline and does not require a running cluster in\n" +
"order to process an image file.\n" +
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Thu Oct 29 00:11:37 2009
@@ -177,6 +177,8 @@
@Deprecated
public boolean rename(String src, String dst) throws IOException { return false; }
+ public void concat(String trg, String[] srcs) throws IOException { }
+
public void rename(String src, String dst, Rename... options) throws IOException { }
public boolean delete(String src) throws IOException { return false; }
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java?rev=830804&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java Thu Oct 29 00:11:37 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHDFSConcat {
+ public static final Log LOG = LogFactory.getLog(TestHDFSConcat.class);
+
+ private static final short REPL_FACTOR = 2;
+
+ private MiniDFSCluster cluster;
+ private NameNode nn;
+ private DistributedFileSystem dfs;
+
+ private static long blockSize = 512;
+
+
+ private static Configuration conf;
+
+ static {
+ conf = new Configuration();
+ conf.setLong("dfs.blocksize", blockSize);
+ }
+
+ @Before
+ public void startUpCluster() throws IOException {
+ cluster = new MiniDFSCluster(conf, REPL_FACTOR, true, null);
+ assertNotNull("Failed Cluster Creation", cluster);
+ cluster.waitClusterUp();
+ dfs = (DistributedFileSystem) cluster.getFileSystem();
+ assertNotNull("Failed to get FileSystem", dfs);
+ nn = cluster.getNameNode();
+ assertNotNull("Failed to get NameNode", nn);
+ }
+
+ @After
+ public void shutDownCluster() throws IOException {
+ if(dfs != null) {
+ dfs.close();
+ }
+ if(cluster != null) {
+ cluster.shutdownDataNodes();
+ cluster.shutdown();
+ }
+ }
+
+ private void runCommand(DFSAdmin admin, String args[], boolean expectEror)
+ throws Exception {
+ int val = admin.run(args);
+ if (expectEror) {
+ assertEquals(val, -1);
+ } else {
+ assertTrue(val>=0);
+ }
+ }
+
+ /**
+ * Concatenates 10 files into one
+ * Verifies the final size, deletion of the file, number of blocks
+ * @throws IOException
+ */
+ @Test
+ public void testConcat() throws IOException {
+ final int numFiles = 10;
+ long fileLen = blockSize*3;
+ FileStatus fStatus;
+ FSDataInputStream stm;
+
+ String trg = new String("/trg");
+ Path trgPath = new Path(trg);
+ DFSTestUtil.createFile(dfs, trgPath, fileLen, REPL_FACTOR, 1);
+ fStatus = nn.getFileInfo(trg);
+ long trgLen = fStatus.getLen();
+ long trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
+
+ Path [] files = new Path[numFiles];
+ byte [] [] bytes = new byte [numFiles][(int)fileLen];
+ LocatedBlocks [] lblocks = new LocatedBlocks[numFiles];
+ long [] lens = new long [numFiles];
+
+
+ int i = 0;
+ for(i=0; i<files.length; i++) {
+ files[i] = new Path("/file"+i);
+ Path path = files[i];
+ System.out.println("Creating file " + path);
+ DFSTestUtil.createFile(dfs, path, fileLen, REPL_FACTOR, 1);
+
+ fStatus = nn.getFileInfo(path.toUri().getPath());
+ lens[i] = fStatus.getLen();
+ assertEquals(trgLen, lens[i]); // file of the same length.
+
+ lblocks[i] = nn.getBlockLocations(path.toUri().getPath(), 0, lens[i]);
+
+ //read the file
+ stm = dfs.open(path);
+ stm.readFully(0, bytes[i]);
+ //bytes[i][10] = 10;
+ stm.close();
+ }
+
+ // check permissions -try the operation with the "wrong" user
+ final UnixUserGroupInformation user1 = new UnixUserGroupInformation(
+ "theDoctor", new String[] { "tardis" });
+ DistributedFileSystem hdfs = (DistributedFileSystem)logonAs(user1, conf, dfs);
+ try {
+ hdfs.concat(trgPath, files);
+ fail("Permission exception expected");
+ } catch (IOException ie) {
+ System.out.println("Got expected exception for permissions:"
+ + ie.getLocalizedMessage());
+ // expected
+ }
+
+ // check count update
+ ContentSummary cBefore = dfs.getContentSummary(trgPath.getParent());
+
+ // now concatenate
+ dfs.concat(trgPath, files);
+
+ // verify count
+ ContentSummary cAfter = dfs.getContentSummary(trgPath.getParent());
+ assertEquals(cBefore.getFileCount(), cAfter.getFileCount()+files.length);
+
+ // verify other stuff
+ long totalLen = trgLen;
+ long totalBlocks = trgBlocks;
+ for(i=0; i<files.length; i++) {
+ totalLen += lens[i];
+ totalBlocks += lblocks[i].locatedBlockCount();
+ }
+ System.out.println("total len=" + totalLen + "; totalBlocks=" + totalBlocks);
+
+
+ fStatus = nn.getFileInfo(trg);
+ trgLen = fStatus.getLen(); // new length
+
+ // read the resulting file
+ stm = dfs.open(trgPath);
+ byte[] byteFileConcat = new byte[(int)trgLen];
+ stm.readFully(0, byteFileConcat);
+ stm.close();
+
+ trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
+
+ //verifications
+ // 1. number of blocks
+ assertEquals(trgBlocks, totalBlocks);
+
+ // 2. file lengths
+ assertEquals(trgLen, totalLen);
+
+ // 3. removal of the src file
+ for(Path p: files) {
+ fStatus = nn.getFileInfo(p.toUri().getPath());
+ assertNull("File " + p + " still exists", fStatus); // file shouldn't exist
+ // try to create fie with the same name
+ DFSTestUtil.createFile(dfs, p, fileLen, REPL_FACTOR, 1);
+ }
+
+ // 4. content
+ checkFileContent(byteFileConcat, bytes);
+
+ // add a small file (less then a block)
+ Path smallFile = new Path("/sfile");
+ int sFileLen = 10;
+ DFSTestUtil.createFile(dfs, smallFile, sFileLen, REPL_FACTOR, 1);
+ dfs.concat(trgPath, new Path [] {smallFile});
+
+ fStatus = nn.getFileInfo(trg);
+ trgLen = fStatus.getLen(); // new length
+
+ // check number of blocks
+ trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
+ assertEquals(trgBlocks, totalBlocks+1);
+
+ // and length
+ assertEquals(trgLen, totalLen+sFileLen);
+
+ }
+
+ // compare content
+ private void checkFileContent(byte[] concat, byte[][] bytes ) {
+ int idx=0;
+ boolean mismatch = false;
+
+ for(byte [] bb: bytes) {
+ for(byte b: bb) {
+ if(b != concat[idx++]) {
+ mismatch=true;
+ break;
+ }
+ }
+ if(mismatch)
+ break;
+ }
+ assertFalse("File content of concatenated file is different", mismatch);
+ }
+
+ /***
+ * Create a new configuration for the specified user and return a filesystem
+ * accessed by that user
+ */
+ static private FileSystem logonAs(UnixUserGroupInformation user,
+ Configuration conf, FileSystem hdfs) throws IOException {
+ Configuration conf2 = new Configuration(conf);
+ UnixUserGroupInformation.saveToConf(conf2,
+ UnixUserGroupInformation.UGI_PROPERTY_NAME, user);
+
+ return FileSystem.get(conf2);
+ }
+
+ // test case when final block is not of a full length
+ @Test
+ public void testConcatNotCompleteBlock() throws IOException {
+ long trgFileLen = blockSize*3;
+ long srcFileLen = blockSize*3+20; // block at the end - not full
+
+
+ // create first file
+ String name1="/trg", name2="/src";
+ Path filePath1 = new Path(name1);
+ DFSTestUtil.createFile(dfs, filePath1, trgFileLen, REPL_FACTOR, 1);
+
+ FileStatus fStatus = cluster.getNameNode().getFileInfo(name1);
+ long fileLen = fStatus.getLen();
+ assertEquals(fileLen, trgFileLen);
+
+ //read the file
+ FSDataInputStream stm = dfs.open(filePath1);
+ byte[] byteFile1 = new byte[(int)trgFileLen];
+ stm.readFully(0, byteFile1);
+ stm.close();
+
+ LocatedBlocks lb1 = cluster.getNameNode().getBlockLocations(name1, 0, trgFileLen);
+
+ Path filePath2 = new Path(name2);
+ DFSTestUtil.createFile(dfs, filePath2, srcFileLen, REPL_FACTOR, 1);
+ fStatus = cluster.getNameNode().getFileInfo(name2);
+ fileLen = fStatus.getLen();
+ assertEquals(srcFileLen, fileLen);
+
+ // read the file
+ stm = dfs.open(filePath2);
+ byte[] byteFile2 = new byte[(int)srcFileLen];
+ stm.readFully(0, byteFile2);
+ stm.close();
+
+ LocatedBlocks lb2 = cluster.getNameNode().getBlockLocations(name2, 0, srcFileLen);
+
+
+ System.out.println("trg len="+trgFileLen+"; src len="+srcFileLen);
+
+ // move the blocks
+ dfs.concat(filePath1, new Path [] {filePath2});
+
+ long totalLen = trgFileLen + srcFileLen;
+ fStatus = cluster.getNameNode().getFileInfo(name1);
+ fileLen = fStatus.getLen();
+
+ // read the resulting file
+ stm = dfs.open(filePath1);
+ byte[] byteFileConcat = new byte[(int)fileLen];
+ stm.readFully(0, byteFileConcat);
+ stm.close();
+
+ LocatedBlocks lbConcat = cluster.getNameNode().getBlockLocations(name1, 0, fileLen);
+
+ //verifications
+ // 1. number of blocks
+ assertEquals(lbConcat.locatedBlockCount(),
+ lb1.locatedBlockCount() + lb2.locatedBlockCount());
+
+ // 2. file lengths
+ System.out.println("file1 len="+fileLen+"; total len="+totalLen);
+ assertEquals(fileLen, totalLen);
+
+ // 3. removal of the src file
+ fStatus = cluster.getNameNode().getFileInfo(name2);
+ assertNull("File "+name2+ "still exists", fStatus); // file shouldn't exist
+
+ // 4. content
+ checkFileContent(byteFileConcat, new byte [] [] {byteFile1, byteFile2});
+ }
+
+ /**
+ * test illegal args cases
+ */
+ @Test
+ public void testIllegalArg() throws IOException {
+ long fileLen = blockSize*3;
+
+ Path parentDir = new Path ("/parentTrg");
+ assertTrue(dfs.mkdirs(parentDir));
+ Path trg = new Path(parentDir, "trg");
+ DFSTestUtil.createFile(dfs, trg, fileLen, REPL_FACTOR, 1);
+
+ // must be in the same dir
+ {
+ // create first file
+ Path dir1 = new Path ("/dir1");
+ assertTrue(dfs.mkdirs(dir1));
+ Path src = new Path(dir1, "src");
+ DFSTestUtil.createFile(dfs, src, fileLen, REPL_FACTOR, 1);
+
+ try {
+ dfs.concat(trg, new Path [] {src});
+ fail("didn't fail for src and trg in different directories");
+ } catch (Exception e) {
+ // expected
+ }
+ }
+ // non existing file
+ try {
+ dfs.concat(trg, new Path [] {new Path("test1/a")}); // non existing file
+ fail("didn't fail with invalid arguments");
+ } catch (Exception e) {
+ //expected
+ }
+ // empty arg list
+ try {
+ dfs.concat(trg, new Path [] {}); // empty array
+ fail("didn't fail with invalid arguments");
+ } catch (Exception e) {
+ // exspected
+ }
+
+ }
+}