You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/29 01:11:39 UTC

svn commit: r830804 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/java/org/apache/hadoop/hdfs/tools/ src/java/org/apache/hadoop/hdfs/tools/off...

Author: hairong
Date: Thu Oct 29 00:11:37 2009
New Revision: 830804

URL: http://svn.apache.org/viewvc?rev=830804&view=rev
Log:
HDFS-222. Support for concatenating of files into a single file without copying. Contributed by Boris Shkolnik.

Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/HDFSConcat.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Thu Oct 29 00:11:37 2009
@@ -13,6 +13,9 @@
     HDFS-654. Add support new atomic rename functionality in HDFS for 
     supporting rename in FileContext. (suresh)
 
+    HDFS-222. Support for concatenating of files into a single file
+    without copying. (Boris Shkolnik via hairong)
+
   IMPROVEMENTS
     
     HDFS-704. Unify build property names to facilitate cross-projects

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Oct 29 00:11:37 2009
@@ -675,6 +675,20 @@
   }
 
   /**
+   * Move blocks from src to trg and delete src
+   * See {@link ClientProtocol#concat(String, String [])}. 
+   */
+  public void concat(String trg, String [] srcs) throws IOException {
+    checkOpen();
+    try {
+      namenode.concat(trg, srcs);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     NSQuotaExceededException.class,
+                                     DSQuotaExceededException.class);
+    }
+  }
+  /**
    * Rename file or directory.
    * See {@link ClientProtocol#rename(String, String, Options.Rename...)}
    */
@@ -688,7 +702,6 @@
                                      DSQuotaExceededException.class);
     }
   }
-
   /**
    * Delete file or directory.
    * See {@link ClientProtocol#delete(String)}. 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu Oct 29 00:11:37 2009
@@ -245,6 +245,23 @@
                                ) throws IOException {
     return dfs.setReplication(getPathName(src), replication);
   }
+  
+  /**
+   * THIS IS DFS only operations, it is not part of FileSystem
+   * move blocks from srcs to trg
+   * and delete srcs afterwards
+   * all blocks should be the same size
+   * @param trg existing file to append to
+   * @param psrcs list of files (same block size, same replication)
+   * @throws IOException
+   */
+  public void concat(Path trg, Path [] psrcs) throws IOException {
+    String [] srcs = new String [psrcs.length];
+    for(int i=0; i<psrcs.length; i++) {
+      srcs[i] = getPathName(psrcs[i]);
+    }
+    dfs.concat(getPathName(trg), srcs);
+  }
 
   /** {@inheritDoc} */
   @SuppressWarnings("deprecation")

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Oct 29 00:11:37 2009
@@ -46,10 +46,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 51: New rename method with support of destination overwrite for the use of
-   * {@link FileContext}
+   * 52: adding concat() API
    */
-  public static final long versionID = 51L;
+  public static final long versionID = 52L;
   
   ///////////////////////////////////////
   // File contents
@@ -243,6 +242,17 @@
   public boolean rename(String src, String dst) throws IOException;
 
   /**
+   * moves blocks from srcs to trg and delete srcs
+   * 
+   * @param trg existing file
+   * @param srcs - list of existing files (same block size, same replication)
+   * @throws IOException if some arguments are invalid
+   * @throws QuotaExceededException if the rename would violate 
+   *                                any quota restriction
+   */
+  public void concat(String trg, String [] srcs) throws IOException;
+
+  /**
    * Rename src to dst.
    * <ul>
    * <li>Fails if src is a file and dst is a directory.

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Thu Oct 29 00:11:37 2009
@@ -91,7 +91,7 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -21;
+  public static final int LAYOUT_VERSION = -22;
   // Current version: 
-  // -21: Added new rename operation to edit log
+  // -22: added new OP_CONCAT_DELETE 
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Oct 29 00:11:37 2009
@@ -789,7 +789,71 @@
       }
     }
   }
+
+  /**
+   * 
+   * @param target
+   * @param srcs
+   * @throws IOException
+   */
+  public void concatInternal(String target, String [] srcs) throws IOException{
+    synchronized(rootDir) {
+      // actual move
+      waitForReady();
+
+      unprotectedConcat(target, srcs);
+      // do the commit
+      fsImage.getEditLog().logConcat(target, srcs, FSNamesystem.now());
+    }
+  }
+  
+
+  
+  /**
+   * Concat all the blocks from srcs to trg
+   * and delete the srcs files
+   * @param trg target file to move the blocks to
+   * @param srcs list of file to move the blocks from
+   * Must be public because also called from EditLogs
+   * NOTE: - it does not update quota (not needed for concat)
+   */
+  public void unprotectedConcat(String target, String [] srcs) throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+    }
+    // do the move
+    
+    INode [] trgINodes =  getExistingPathINodes(target);
+    INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
+    INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
     
+    INodeFile [] allSrcInodes = new INodeFile[srcs.length];
+    int i = 0;
+    int totalBlocks = 0;
+    for(String src : srcs) {
+      INodeFile srcInode = getFileINode(src);
+      allSrcInodes[i++] = srcInode;
+      totalBlocks += srcInode.blocks.length;  
+    }
+    trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks
+    
+    // since we are in the same dir - we can use same parent to remove files
+    int count = 0;
+    for(INodeFile nodeToRemove: allSrcInodes) {
+      if(nodeToRemove == null) continue;
+      
+      nodeToRemove.blocks = null;
+      trgParent.removeChild(nodeToRemove);
+      count++;
+    }
+    
+    long now = FSNamesystem.now();
+    trgInode.setModificationTime(now);
+    trgParent.setModificationTime(now);
+    // update quota on the parent directory ('count' files removed, 0 space)
+    unprotectedUpdateCount(trgINodes, trgINodes.length-1, - count, 0);
+  }
+
   /**
    * Delete the target directory and collect the blocks under it
    * 
@@ -1132,6 +1196,24 @@
     }
   }
   
+  /**
+   * updates quota without verification
+   * callers responsibility is to make sure quota is not exceeded
+   * @param inodes
+   * @param numOfINodes
+   * @param nsDelta
+   * @param dsDelta
+   */
+   void unprotectedUpdateCount(INode[] inodes, int numOfINodes, 
+                                      long nsDelta, long dsDelta) {
+    for(int i=0; i < numOfINodes; i++) {
+      if (inodes[i].isQuotaSet()) { // a directory with quota
+        INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+        node.unprotectedUpdateNumItemsInTree(nsDelta, dsDelta);
+      }
+    }
+  }
+  
   /** Return the name of the path represented by inodes at [0, pos] */
   private static String getFullPathName(INode[] inodes, int pos) {
     StringBuilder fullPathName = new StringBuilder();

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Oct 29 00:11:37 2009
@@ -24,6 +24,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -31,6 +32,7 @@
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -45,13 +47,11 @@
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
-import org.mortbay.log.Log;
 
 /**
  * FSEditLog maintains a log of the namespace modifications.
@@ -78,6 +78,8 @@
   private static final byte OP_TIMES = 13; // sets mod & access time on a file
   private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
   private static final byte OP_RENAME = 15;  // new rename
+  private static final byte OP_CONCAT_DELETE = 16; // concat files.
+
   /* 
    * The following operations are used to control remote edit log streams,
    * and not logged into file streams.
@@ -424,7 +426,7 @@
     int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
         numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
         numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
-        numOpTimes = 0, numOpRename = 0, numOpOther = 0;
+        numOpTimes = 0, numOpRename = 0, numOpConcatDelete = 0, numOpOther = 0;
     try {
       while (true) {
         long timestamp = 0;
@@ -545,6 +547,27 @@
           fsDir.unprotectedSetReplication(path, replication, null);
           break;
         } 
+        case OP_CONCAT_DELETE: {
+          if (logVersion > -22) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpConcatDelete++;
+          int length = in.readInt();
+          if (length < 3) { // trg, srcs.., timestam
+            throw new IOException("Incorrect data format. " 
+                                  + "Mkdir operation.");
+          }
+          String trg = FSImage.readString(in);
+          int srcSize = length - 1 - 1; //trg and timestamp
+          String [] srcs = new String [srcSize];
+          for(int i=0; i<srcSize;i++) {
+            srcs[i]= FSImage.readString(in);
+          }
+          timestamp = readLong(in);
+          fsDir.unprotectedConcat(trg, srcs);
+          break;
+        }
         case OP_RENAME_OLD: {
           numOpRenameOld++;
           int length = in.readInt();
@@ -715,6 +738,7 @@
           + " numOpSetOwner = " + numOpSetOwner
           + " numOpSetGenStamp = " + numOpSetGenStamp 
           + " numOpTimes = " + numOpTimes
+          + " numOpConcatDelete  = " + numOpConcatDelete
           + " numOpRename = " + numOpRename
           + " numOpOther = " + numOpOther);
     }
@@ -755,7 +779,7 @@
     ArrayList<EditLogOutputStream> errorStreams = null;
     long start = FSNamesystem.now();
     for(EditLogOutputStream eStream : editStreams) {
-      Log.debug("loggin edits into " + eStream.getName()  + " stream");
+      FSImage.LOG.debug("loggin edits into " + eStream.getName()  + " stream");
       if(!eStream.isOperationSupported(op))
         continue;
       try {
@@ -1005,7 +1029,22 @@
     DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
     logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
   }
-
+  
+  /**
+   * concat(trg,src..) log
+   */
+  void logConcat(String trg, String [] srcs, long timestamp) {
+    int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+    DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
+    int idx = 0;
+    info[idx++] = new DeprecatedUTF8(trg);
+    for(int i=0; i<srcs.length; i++) {
+      info[idx++] = new DeprecatedUTF8(srcs[i]);
+    }
+    info[idx] = FSEditLog.toLogLong(timestamp);
+    logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+  }
+  
   /** 
    * Add delete file record to edit log
    */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Oct 29 00:11:37 2009
@@ -764,6 +764,146 @@
     }
     return lb;
   }
+  
+  /**
+   * Moves all the blocks from srcs and appends them to trg
+   * To avoid rollbacks we will verify validitity of ALL of the args
+   * before we start actual move.
+   * @param target
+   * @param srcs
+   * @throws IOException
+   */
+  public void concat(String target, String [] srcs) throws IOException{
+    FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) + " to " + target);
+    // check safe mode
+    if (isInSafeMode()) {
+      throw new SafeModeException("concat: cannot concat " + target, safeMode);
+    }
+    
+    // verify args
+    if(target.isEmpty()) {
+      throw new IllegalArgumentException("concat: trg file name is empty");
+    }
+    if(srcs == null || srcs.length == 0) {
+      throw new IllegalArgumentException("concat:  srcs list is empty or null");
+    }
+    
+    // curretnly we require all the files to be in the same dir
+    String trgParent = 
+      target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
+    for(String s : srcs) {
+      String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
+      if(! srcParent.equals(trgParent)) {
+        throw new IllegalArgumentException
+           ("concat:  srcs and target shoould be in same dir");
+      }
+    }
+    
+    synchronized(this) {
+      // write permission for the target
+      if (isPermissionEnabled) {
+        checkPathAccess(target, FsAction.WRITE);
+
+        // and srcs
+        for(String aSrc: srcs) {
+          checkPathAccess(aSrc, FsAction.READ); // read the file
+          checkParentAccess(aSrc, FsAction.WRITE); // for delete 
+        }
+      }
+
+
+      // to make sure no two files are the same
+      Set<INode> si = new HashSet<INode>();
+
+      // we put the following prerequisite for the operation
+      // replication and blocks sizes should be the same for ALL the blocks
+      // check the target
+      INode inode = dir.getFileINode(target);
+
+      if(inode == null) {
+        throw new IllegalArgumentException("concat: trg file doesn't exist");
+      }
+      if(inode.isUnderConstruction()) {
+        throw new IllegalArgumentException("concat: trg file is uner construction");
+      }
+
+      INodeFile trgInode = (INodeFile) inode;
+
+      // per design trg shouldn't be empty and all the blocks same size
+      if(trgInode.blocks.length == 0) {
+        throw new IllegalArgumentException("concat: "+ target + " file is empty");
+      }
+
+      long blockSize = trgInode.preferredBlockSize;
+
+      // check the end block to be full
+      if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
+        throw new IllegalArgumentException(target + " blocks size should be the same");
+      }
+
+      si.add(trgInode);
+      short repl = trgInode.blockReplication;
+
+      // now check the srcs
+      boolean endSrc = false; // final src file doesn't have to have full end block
+      for(int i=0; i<srcs.length; i++) {
+        String src = srcs[i];
+        if(i==srcs.length-1)
+          endSrc=true;
+
+        INodeFile srcInode = dir.getFileINode(src);
+
+        if(src.isEmpty() 
+            || srcInode == null
+            || srcInode.isUnderConstruction()
+            || srcInode.blocks.length == 0) {
+          throw new IllegalArgumentException("concat: file " + src + 
+          " is invalid or empty or underConstruction");
+        }
+
+        // check replication and blocks size
+        if(repl != srcInode.blockReplication) {
+          throw new IllegalArgumentException(src + " and " + target + " " +
+              "should have same replication: "
+              + repl + " vs. " + srcInode.blockReplication);
+        }
+
+        //boolean endBlock=false;
+        // verify that all the blocks are of the same length as target
+        // should be enough to check the end blocks
+        int idx = srcInode.blocks.length-1;
+        if(endSrc)
+          idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
+        if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
+          throw new IllegalArgumentException("concat: blocks sizes of " + 
+              src + " and " + target + " should all be the same");
+        }
+
+        si.add(srcInode);
+      }
+
+      // make sure no two files are the same
+      if(si.size() < srcs.length+1) { // trg + srcs
+        // it means at least two files are the same
+        throw new IllegalArgumentException("at least two files are the same");
+      }
+
+      NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + 
+          Arrays.toString(srcs) + " to " + target);
+
+      dir.concatInternal(target,srcs);
+    }
+    getEditLog().logSync();
+   
+    
+    if (auditLog.isInfoEnabled()) {
+      final FileStatus stat = dir.getFileInfo(target);
+      logAuditEvent(UserGroupInformation.getCurrentUGI(),
+                    Server.getRemoteIp(),
+                    "concat", Arrays.toString(srcs), target, stat);
+    }
+   
+  }
 
   /**
    * stores the modification and access time for this inode. 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Thu Oct 29 00:11:37 2009
@@ -88,7 +88,7 @@
    * @param dsQuota diskspace quota to be set
    *                                
    */
-  void setQuota(long newNsQuota, long newDsQuota) throws QuotaExceededException {
+  void setQuota(long newNsQuota, long newDsQuota) {
     nsQuota = newNsQuota;
     dsQuota = newDsQuota;
   }
@@ -122,6 +122,16 @@
     diskspace += dsDelta;
   }
   
+  /** Update the size of the tree
+   * 
+   * @param nsDelta the change of the tree size
+   * @param dsDelta change to disk space occupied
+   **/
+  void unprotectedUpdateNumItemsInTree(long nsDelta, long dsDelta) {
+    nsCount = nsCount + nsDelta;
+    diskspace = diskspace + dsDelta;
+  }
+  
   /** 
    * Sets namespace and diskspace take by the directory rooted 
    * at this INode. This should be used carefully. It does not check 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Thu Oct 29 00:11:37 2009
@@ -88,6 +88,26 @@
   }
 
   /**
+   * append array of blocks to this.blocks
+   */
+  void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) {
+    int size = this.blocks.length;
+    
+    BlockInfo[] newlist = new BlockInfo[size + totalAddedBlocks];
+    System.arraycopy(this.blocks, 0, newlist, 0, size);
+    
+    for(INodeFile in: inodes) {
+      System.arraycopy(in.blocks, 0, newlist, size, in.blocks.length);
+      size += in.blocks.length;
+    }
+    
+    for(BlockInfo bi: this.blocks) {
+      bi.setINode(this);
+    }
+    this.blocks = newlist;
+  }
+  
+  /**
    * add a block to the block list
    */
   void addBlock(BlockInfo newblock) {
@@ -112,9 +132,11 @@
 
   int collectSubtreeBlocksAndClear(List<Block> v) {
     parent = null;
-    for (BlockInfo blk : blocks) {
-      v.add(blk);
-      blk.setINode(null);
+    if(blocks != null && v != null) {
+      for (BlockInfo blk : blocks) {
+        v.add(blk);
+        blk.setINode(null);
+      }
     }
     blocks = null;
     return 1;
@@ -160,6 +182,9 @@
   
   long diskspaceConsumed(Block[] blkArr) {
     long size = 0;
+    if(blkArr == null) 
+      return 0;
+    
     for (Block blk : blkArr) {
       if (blk != null) {
         size += blk.getNumBytes();

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Oct 29 00:11:37 2009
@@ -714,7 +714,13 @@
     return ret;
   }
   
-
+  /** 
+   * {@inheritDoc}
+   */
+  public void concat(String trg, String[] src) throws IOException {
+    namesystem.concat(trg, src);
+  }
+  
   /** {@inheritDoc} */
   @Override
   public void rename(String src, String dst, Options.Rename... options) throws IOException {

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/HDFSConcat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/HDFSConcat.java?rev=830804&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/HDFSConcat.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/HDFSConcat.java Thu Oct 29 00:11:37 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+
+public class HDFSConcat {
+  private final static String def_uri = "hdfs://localhost:9000";
+  /**
+   * @param args
+   */
+  public static void main(String... args) throws IOException {
+
+    if(args.length < 2) {
+      System.err.println("Usage HDFSConcat target srcs..");
+      System.exit(0);
+    }
+    
+    Configuration conf = new Configuration();
+    String uri = conf.get("fs.default.name", def_uri);
+    Path path = new Path(uri);
+    DistributedFileSystem dfs = 
+      (DistributedFileSystem)FileSystem.get(path.toUri(), conf);
+    
+    Path [] srcs = new Path[args.length-1];
+    for(int i=1; i<args.length; i++) {
+      srcs[i-1] = new Path(args[i]);
+    }
+    dfs.concat(new Path(args[0]), srcs);
+  }
+
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Thu Oct 29 00:11:37 2009
@@ -96,7 +96,7 @@
 class ImageLoaderCurrent implements ImageLoader {
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
-  private static int [] versions = {-16, -17, -18, -19, -20, -21};
+  private static int [] versions = {-16, -17, -18, -19, -20, -21, -22};
   private int imageVersion = 0;
 
   /* (non-Javadoc)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java Thu Oct 29 00:11:37 2009
@@ -45,7 +45,7 @@
     "\n" +
     "The oiv utility will attempt to parse correctly formed image files\n" +
     "and will abort fail with mal-formed image files. Currently the\n" +
-    "supports FSImage layout versions -16 through -19.\n" +
+    "supports FSImage layout versions -16 through -22.\n" +
     "\n" +
     "The tool works offline and does not require a running cluster in\n" +
     "order to process an image file.\n" +

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=830804&r1=830803&r2=830804&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Thu Oct 29 00:11:37 2009
@@ -177,6 +177,8 @@
     @Deprecated
     public boolean rename(String src, String dst) throws IOException { return false; }
     
+    public void concat(String trg, String[] srcs) throws IOException {  }
+
     public void rename(String src, String dst, Rename... options) throws IOException { }
 
     public boolean delete(String src) throws IOException { return false; }

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java?rev=830804&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java Thu Oct 29 00:11:37 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHDFSConcat {
+  public static final Log LOG = LogFactory.getLog(TestHDFSConcat.class);
+
+  private static final short REPL_FACTOR = 2;
+  
+  private MiniDFSCluster cluster;
+  private NameNode nn;
+  private DistributedFileSystem dfs;
+
+  private static long blockSize = 512;
+
+  
+  private static Configuration conf;
+
+  static {
+    conf = new Configuration();
+    conf.setLong("dfs.blocksize", blockSize);
+  }
+  
+  @Before
+  public void startUpCluster() throws IOException {
+    cluster = new MiniDFSCluster(conf, REPL_FACTOR, true, null);
+    assertNotNull("Failed Cluster Creation", cluster);
+    cluster.waitClusterUp();
+    dfs = (DistributedFileSystem) cluster.getFileSystem();
+    assertNotNull("Failed to get FileSystem", dfs);
+    nn = cluster.getNameNode();
+    assertNotNull("Failed to get NameNode", nn);
+  }
+
+  @After
+  public void shutDownCluster() throws IOException {
+    if(dfs != null) {
+      dfs.close();
+    }
+    if(cluster != null) {
+      cluster.shutdownDataNodes();
+      cluster.shutdown();
+    }
+  }
+  
+  private void runCommand(DFSAdmin admin, String args[], boolean expectEror)
+  throws Exception {
+    int val = admin.run(args);
+    if (expectEror) {
+      assertEquals(val, -1);
+    } else {
+      assertTrue(val>=0);
+    }
+  }
+
+  /**
+   * Concatenates 10 files into one
+   * Verifies the final size, deletion of the file, number of blocks
+   * @throws IOException
+   */
+  @Test
+  public void testConcat() throws IOException {
+    final int numFiles = 10;
+    long fileLen = blockSize*3;
+    FileStatus fStatus;
+    FSDataInputStream stm;
+    
+    String trg = new String("/trg");
+    Path trgPath = new Path(trg);
+    DFSTestUtil.createFile(dfs, trgPath, fileLen, REPL_FACTOR, 1);
+    fStatus  = nn.getFileInfo(trg);
+    long trgLen = fStatus.getLen();
+    long trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
+       
+    Path [] files = new Path[numFiles];
+    byte [] [] bytes = new byte [numFiles][(int)fileLen];
+    LocatedBlocks [] lblocks = new LocatedBlocks[numFiles];
+    long [] lens = new long [numFiles];
+    
+    
+    int i = 0;
+    for(i=0; i<files.length; i++) {
+      files[i] = new Path("/file"+i);
+      Path path = files[i];
+      System.out.println("Creating file " + path);
+      DFSTestUtil.createFile(dfs, path, fileLen, REPL_FACTOR, 1);
+    
+      fStatus = nn.getFileInfo(path.toUri().getPath());
+      lens[i] = fStatus.getLen();
+      assertEquals(trgLen, lens[i]); // file of the same length.
+      
+      lblocks[i] = nn.getBlockLocations(path.toUri().getPath(), 0, lens[i]);
+      
+      //read the file
+      stm = dfs.open(path);
+      stm.readFully(0, bytes[i]);
+      //bytes[i][10] = 10;
+      stm.close();
+    }
+    
+    // check permissions -try the operation with the "wrong" user
+    final UnixUserGroupInformation user1 = new UnixUserGroupInformation(
+        "theDoctor", new String[] { "tardis" });
+    DistributedFileSystem hdfs = (DistributedFileSystem)logonAs(user1, conf, dfs);
+    try {
+      hdfs.concat(trgPath, files);
+      fail("Permission exception expected");
+    } catch (IOException ie) {
+      System.out.println("Got expected exception for permissions:"
+          + ie.getLocalizedMessage());
+      // expected
+    }
+    
+    // check count update
+    ContentSummary cBefore = dfs.getContentSummary(trgPath.getParent());
+    
+    // now concatenate
+    dfs.concat(trgPath, files);
+    
+    // verify  count
+    ContentSummary cAfter = dfs.getContentSummary(trgPath.getParent());
+    assertEquals(cBefore.getFileCount(), cAfter.getFileCount()+files.length);
+    
+    // verify other stuff
+    long totalLen = trgLen;
+    long totalBlocks = trgBlocks;
+    for(i=0; i<files.length; i++) {
+      totalLen += lens[i];
+      totalBlocks += lblocks[i].locatedBlockCount();
+    }
+    System.out.println("total len=" + totalLen + "; totalBlocks=" + totalBlocks);
+    
+    
+    fStatus = nn.getFileInfo(trg);
+    trgLen  = fStatus.getLen(); // new length
+    
+    // read the resulting file
+    stm = dfs.open(trgPath);
+    byte[] byteFileConcat = new byte[(int)trgLen];
+    stm.readFully(0, byteFileConcat);
+    stm.close();
+    
+    trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
+    
+    //verifications
+    // 1. number of blocks
+    assertEquals(trgBlocks, totalBlocks); 
+        
+    // 2. file lengths
+    assertEquals(trgLen, totalLen);
+    
+    // 3. removal of the src file
+    for(Path p: files) {
+      fStatus = nn.getFileInfo(p.toUri().getPath());
+      assertNull("File " + p + " still exists", fStatus); // file shouldn't exist
+      // try to create fie with the same name
+      DFSTestUtil.createFile(dfs, p, fileLen, REPL_FACTOR, 1); 
+    }
+  
+    // 4. content
+    checkFileContent(byteFileConcat, bytes);
+    
+    // add a small file (less then a block)
+    Path smallFile = new Path("/sfile");
+    int sFileLen = 10;
+    DFSTestUtil.createFile(dfs, smallFile, sFileLen, REPL_FACTOR, 1);
+    dfs.concat(trgPath, new Path [] {smallFile});
+    
+    fStatus = nn.getFileInfo(trg);
+    trgLen  = fStatus.getLen(); // new length
+    
+    // check number of blocks
+    trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
+    assertEquals(trgBlocks, totalBlocks+1);
+    
+    // and length
+    assertEquals(trgLen, totalLen+sFileLen);
+    
+  }
+
+  // compare content
+  private void checkFileContent(byte[] concat, byte[][] bytes ) {
+    int idx=0;
+    boolean mismatch = false;
+    
+    for(byte [] bb: bytes) {
+      for(byte b: bb) {
+        if(b != concat[idx++]) {
+          mismatch=true;
+          break;
+        }
+      }
+      if(mismatch)
+        break;
+    }
+    assertFalse("File content of concatenated file is different", mismatch);
+  }
+
+  /***
+   * Create a new configuration for the specified user and return a filesystem
+   * accessed by that user
+   */
+  static private FileSystem logonAs(UnixUserGroupInformation user,
+      Configuration conf, FileSystem hdfs) throws IOException {
+    Configuration conf2 = new Configuration(conf);
+    UnixUserGroupInformation.saveToConf(conf2,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME, user);
+
+    return FileSystem.get(conf2);
+  }
+
+  // test case when final block is not of a full length
+  @Test
+  public void testConcatNotCompleteBlock() throws IOException {
+    long trgFileLen = blockSize*3;
+    long srcFileLen = blockSize*3+20; // block at the end - not full
+
+    
+    // create first file
+    String name1="/trg", name2="/src";
+    Path filePath1 = new Path(name1);
+    DFSTestUtil.createFile(dfs, filePath1, trgFileLen, REPL_FACTOR, 1);
+    
+    FileStatus fStatus = cluster.getNameNode().getFileInfo(name1);
+    long fileLen = fStatus.getLen();
+    assertEquals(fileLen, trgFileLen);
+    
+    //read the file
+    FSDataInputStream stm = dfs.open(filePath1);
+    byte[] byteFile1 = new byte[(int)trgFileLen];
+    stm.readFully(0, byteFile1);
+    stm.close();
+    
+    LocatedBlocks lb1 = cluster.getNameNode().getBlockLocations(name1, 0, trgFileLen);
+    
+    Path filePath2 = new Path(name2);
+    DFSTestUtil.createFile(dfs, filePath2, srcFileLen, REPL_FACTOR, 1);
+    fStatus = cluster.getNameNode().getFileInfo(name2);
+    fileLen = fStatus.getLen();
+    assertEquals(srcFileLen, fileLen);
+    
+    // read the file
+    stm = dfs.open(filePath2);
+    byte[] byteFile2 = new byte[(int)srcFileLen];
+    stm.readFully(0, byteFile2);
+    stm.close();
+    
+    LocatedBlocks lb2 = cluster.getNameNode().getBlockLocations(name2, 0, srcFileLen);
+    
+    
+    System.out.println("trg len="+trgFileLen+"; src len="+srcFileLen);
+    
+    // move the blocks
+    dfs.concat(filePath1, new Path [] {filePath2});
+    
+    long totalLen = trgFileLen + srcFileLen;
+    fStatus = cluster.getNameNode().getFileInfo(name1);
+    fileLen = fStatus.getLen();
+    
+    // read the resulting file
+    stm = dfs.open(filePath1);
+    byte[] byteFileConcat = new byte[(int)fileLen];
+    stm.readFully(0, byteFileConcat);
+    stm.close();
+    
+    LocatedBlocks lbConcat = cluster.getNameNode().getBlockLocations(name1, 0, fileLen);
+    
+    //verifications
+    // 1. number of blocks
+    assertEquals(lbConcat.locatedBlockCount(), 
+        lb1.locatedBlockCount() + lb2.locatedBlockCount());
+    
+    // 2. file lengths
+    System.out.println("file1 len="+fileLen+"; total len="+totalLen);
+    assertEquals(fileLen, totalLen);
+    
+    // 3. removal of the src file
+    fStatus = cluster.getNameNode().getFileInfo(name2);
+    assertNull("File "+name2+ "still exists", fStatus); // file shouldn't exist
+  
+    // 4. content
+    checkFileContent(byteFileConcat, new byte [] [] {byteFile1, byteFile2});
+  }
+  
+  /**
+   * test illegal args cases
+   */
+  @Test
+  public void testIllegalArg() throws IOException {
+    long fileLen = blockSize*3;
+    
+    Path parentDir  = new Path ("/parentTrg");
+    assertTrue(dfs.mkdirs(parentDir));
+    Path trg = new Path(parentDir, "trg");
+    DFSTestUtil.createFile(dfs, trg, fileLen, REPL_FACTOR, 1);
+
+    // must be in the same dir
+    {
+      // create first file
+      Path dir1 = new Path ("/dir1");
+      assertTrue(dfs.mkdirs(dir1));
+      Path src = new Path(dir1, "src");
+      DFSTestUtil.createFile(dfs, src, fileLen, REPL_FACTOR, 1);
+      
+      try {
+        dfs.concat(trg, new Path [] {src});
+        fail("didn't fail for src and trg in different directories");
+      } catch (Exception e) {
+        // expected
+      }
+    }
+    // non existing file
+    try {
+      dfs.concat(trg, new Path [] {new Path("test1/a")}); // non existing file
+      fail("didn't fail with invalid arguments");
+    } catch (Exception e) {
+      //expected
+    }
+    // empty arg list
+    try {
+      dfs.concat(trg, new Path [] {}); // empty array
+      fail("didn't fail with invalid arguments");
+    } catch (Exception e) {
+      // exspected
+    }
+ 
+  }
+}