You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/20 23:00:48 UTC

svn commit: r1095512 [2/3] - in /hadoop/hdfs/branches/HDFS-1052: ./ src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/ap...

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Wed Apr 20 21:00:45 2011
@@ -17,12 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -226,8 +229,15 @@ public class BackupImage extends FSImage
           // update NameSpace in memory
           backupInputStream.setBytes(data);
           FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-          logLoader.loadEditRecords(storage.getLayoutVersion(),
-                    backupInputStream.getDataInputStream(), true);
+          int logVersion = storage.getLayoutVersion();
+          BufferedInputStream bin = new BufferedInputStream(backupInputStream);
+          DataInputStream in = new DataInputStream(bin);
+          Checksum checksum = null;
+          if (logVersion <= -28) { // support fsedits checksum
+            checksum = FSEditLog.getChecksum();
+            in = new DataInputStream(new CheckedInputStream(bin, checksum));
+          }
+          logLoader.loadEditRecords(logVersion, in, checksum, true);
           getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
           break;
         case INPROGRESS:
@@ -346,14 +356,21 @@ public class BackupImage extends FSImage
     if(jSpoolFile.exists()) {
       // load edits.new
       EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
-      DataInputStream in = edits.getDataInputStream();
+      BufferedInputStream bin = new BufferedInputStream(edits);
+      DataInputStream in = new DataInputStream(bin);
       FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-      numEdits += logLoader.loadFSEdits(in, false);
+      int logVersion = logLoader.readLogVersion(in);
+      Checksum checksum = null;
+      if (logVersion <= -28) { // support fsedits checksum
+        checksum = FSEditLog.getChecksum();
+        in = new DataInputStream(new CheckedInputStream(bin, checksum));
+      }
+      numEdits += logLoader.loadEditRecords(logVersion, in, checksum, false);
 
       // first time reached the end of spool
       jsState = JSpoolState.WAIT;
-      numEdits += logLoader.loadEditRecords(storage.getLayoutVersion(),
-                                            in, true);
+      numEdits += logLoader.loadEditRecords(logVersion,
+                                            in, checksum, true);
       getFSNamesystem().dir.updateCountForINodeWithQuota();
       edits.close();
     }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Wed Apr 20 21:00:45 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.zip.Checksum;
 
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -84,10 +85,18 @@ class EditLogFileOutputStream extends Ed
   /** {@inheritDoc} */
   @Override
   void write(byte op, Writable... writables) throws IOException {
+    int start = bufCurrent.getLength();
     write(op);
     for (Writable w : writables) {
       w.write(bufCurrent);
     }
+    // write transaction checksum
+    int end = bufCurrent.getLength();
+    Checksum checksum = FSEditLog.getChecksum();
+    checksum.reset();
+    checksum.update(bufCurrent.getData(), start, end-start);
+    int sum = (int)checksum.getValue();
+    bufCurrent.writeInt(sum);
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Apr 20 21:00:45 2011
@@ -1285,9 +1285,10 @@ class FSDirectory implements Closeable {
    * Check whether the path specifies a directory
    */
   boolean isDir(String src) throws UnresolvedLinkException {
+    src = normalizePath(src);
     readLock();
     try {
-      INode node = rootDir.getNode(normalizePath(src), false);
+      INode node = rootDir.getNode(src, false);
       return node != null && node.isDirectory();
     } finally {
       readUnlock();
@@ -1385,6 +1386,12 @@ class FSDirectory implements Closeable {
   /** Return the name of the path represented by inodes at [0, pos] */
   private static String getFullPathName(INode[] inodes, int pos) {
     StringBuilder fullPathName = new StringBuilder();
+    if (inodes[0].isRoot()) {
+      if (pos == 0) return Path.SEPARATOR;
+    } else {
+      fullPathName.append(inodes[0].getLocalName());
+    }
+    
     for (int i=1; i<=pos; i++) {
       fullPathName.append(Path.SEPARATOR_CHAR).append(inodes[i].getLocalName());
     }
@@ -2018,7 +2025,7 @@ class FSDirectory implements Closeable {
         return null;
       }
     }
-    final String userName = UserGroupInformation.getCurrentUser().getUserName();
+    final String userName = dirPerms.getUserName();
     INodeSymlink newNode = unprotectedSymlink(path, target, modTime, modTime,
       new PermissionStatus(userName, null, FsPermission.getDefault()));         
     if (newNode == null) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Apr 20 21:00:45 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.PureJavaCrc32;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 
@@ -90,6 +92,18 @@ public class FSEditLog implements NNStor
 
   private NNStorage storage;
 
+  private static ThreadLocal<Checksum> localChecksum =
+    new ThreadLocal<Checksum>() {
+    protected Checksum initialValue() {
+      return new PureJavaCrc32();
+    }
+  };
+
+  /** Get a thread local checksum */
+  public static Checksum getChecksum() {
+    return localChecksum.get();
+  }
+
   private static class TransactionId {
     public long txid;
 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Apr 20 21:00:45 2011
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
 
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -54,42 +58,62 @@ public class FSEditLogLoader {
    * along.
    */
   int loadFSEdits(EditLogInputStream edits) throws IOException {
-    DataInputStream in = edits.getDataInputStream();
     long startTime = now();
-    int numEdits = loadFSEdits(in, true);
+    int numEdits = loadFSEdits(edits, true);
     FSImage.LOG.info("Edits file " + edits.getName() 
         + " of size " + edits.length() + " edits # " + numEdits 
         + " loaded in " + (now()-startTime)/1000 + " seconds.");
     return numEdits;
   }
 
-  int loadFSEdits(DataInputStream in, boolean closeOnExit) throws IOException {
+  /**
+   * Read the header of fsedit log
+   * @param in fsedit stream
+   * @return the edit log version number
+   * @throws IOException if error occurs
+   */
+  int readLogVersion(DataInputStream in) throws IOException {
+    int logVersion = 0;
+    // Read log file version. Could be missing. 
+    in.mark(4);
+    // If edits log is greater than 2G, available method will return negative
+    // numbers, so we avoid having to call available
+    boolean available = true;
+    try {
+      logVersion = in.readByte();
+    } catch (EOFException e) {
+      available = false;
+    }
+    if (available) {
+      in.reset();
+      logVersion = in.readInt();
+      if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+        throw new IOException(
+            "Unexpected version of the file system log file: "
+            + logVersion + ". Current version = " 
+            + FSConstants.LAYOUT_VERSION + ".");
+    }
+    assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+      "Unsupported version " + logVersion;
+    return logVersion;
+  }
+  
+  int loadFSEdits(EditLogInputStream edits, boolean closeOnExit) throws IOException {
+    BufferedInputStream bin = new BufferedInputStream(edits);
+    DataInputStream in = new DataInputStream(bin);
+    
     int numEdits = 0;
     int logVersion = 0;
 
     try {
-      // Read log file version. Could be missing. 
-      in.mark(4);
-      // If edits log is greater than 2G, available method will return negative
-      // numbers, so we avoid having to call available
-      boolean available = true;
-      try {
-        logVersion = in.readByte();
-      } catch (EOFException e) {
-        available = false;
-      }
-      if (available) {
-        in.reset();
-        logVersion = in.readInt();
-        if (logVersion < FSConstants.LAYOUT_VERSION) // future version
-          throw new IOException(
-                          "Unexpected version of the file system log file: "
-                          + logVersion + ". Current version = " 
-                          + FSConstants.LAYOUT_VERSION + ".");
+      logVersion = readLogVersion(in);
+      Checksum checksum = null;
+      if (logVersion <= -28) { // support fsedits checksum
+        checksum = FSEditLog.getChecksum();
+        in = new DataInputStream(new CheckedInputStream(bin, checksum));
       }
-      assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
-                            "Unsupported version " + logVersion;
-      numEdits = loadEditRecords(logVersion, in, false);
+
+      numEdits = loadEditRecords(logVersion, in, checksum, false);
     } finally {
       if(closeOnExit)
         in.close();
@@ -101,7 +125,7 @@ public class FSEditLogLoader {
 
   @SuppressWarnings("deprecation")
   int loadEditRecords(int logVersion, DataInputStream in,
-      boolean closeOnExit) throws IOException {
+      Checksum checksum, boolean closeOnExit) throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
     String clientName = null;
@@ -123,6 +147,9 @@ public class FSEditLogLoader {
         long blockSize = 0;
         FSEditLogOpCodes opCode;
         try {
+          if (checksum != null) {
+            checksum.reset();
+          }
           in.mark(1);
           byte opCodeByte = in.readByte();
           opCode = FSEditLogOpCodes.fromByte(opCodeByte);
@@ -480,6 +507,7 @@ public class FSEditLogLoader {
           throw new IOException("Never seen opCode " + opCode);
         }
         }
+        validateChecksum(in, checksum, numEdits);
       }
     } finally {
       if(closeOnExit)
@@ -505,6 +533,22 @@ public class FSEditLogLoader {
     return numEdits;
   }
 
+  /**
+   * Validate a transaction's checksum
+   */
+  private static void validateChecksum(
+      DataInputStream in, Checksum checksum, int tid)
+  throws IOException {
+    if (checksum != null) {
+      int calculatedChecksum = (int)checksum.getValue();
+      int readChecksum = in.readInt(); // read in checksum
+      if (readChecksum != calculatedChecksum) {
+        throw new ChecksumException(
+            "Transaction " + tid + " is corrupt. Calculated checksum is " +
+            calculatedChecksum + " but read checksum " + readChecksum, tid);
+      }
+    }
+  }
 
   /**
    * A class to read in blocks stored in the old format. The only two

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Wed Apr 20 21:00:45 2011
@@ -30,6 +30,7 @@ import java.security.DigestInputStream;
 import java.security.DigestOutputStream;
 import java.security.MessageDigest;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -179,7 +180,11 @@ class FSImageFormat {
 
         // load all inodes
         LOG.info("Number of files = " + numFiles);
-        loadFullNameINodes(numFiles, in);
+        if (imgVersion <= -30) {
+          loadLocalNameINodes(numFiles, in);
+        } else {
+          loadFullNameINodes(numFiles, in);
+        }
 
         // load datanode info
         this.loadDatanodes(in);
@@ -215,6 +220,64 @@ class FSImageFormat {
     fsDir.rootDir.setPermissionStatus(root.getPermissionStatus());    
   }
 
+  /** 
+   * load fsimage files assuming only local names are stored
+   *   
+   * @param numFiles number of files expected to be read
+   * @param in image input stream
+   * @throws IOException
+   */  
+   private void loadLocalNameINodes(long numFiles, DataInputStream in) 
+   throws IOException {
+     assert imgVersion <= -30; // -30: store only local name in image
+     assert numFiles > 0;
+
+     // load root
+     if( in.readShort() != 0) {
+       throw new IOException("First node is not root");
+     }   
+     INode root = loadINode(in);
+     // update the root's attributes
+     updateRootAttr(root);
+     numFiles--;
+
+     // load rest of the nodes directory by directory
+     while (numFiles > 0) {
+       numFiles -= loadDirectory(in);
+     }
+     if (numFiles != 0) {
+       throw new IOException("Read unexpect number of files: " + -numFiles);
+     }
+   }
+   
+   /**
+    * Load all children of a directory
+    * 
+    * @param in
+    * @return number of child inodes read
+    * @throws IOException
+    */
+   private int loadDirectory(DataInputStream in) throws IOException {
+     String parentPath = FSImageSerialization.readString(in);
+     FSDirectory fsDir = namesystem.dir;
+     INode parent = fsDir.rootDir.getNode(parentPath, true);
+     if (parent == null || !parent.isDirectory()) {
+       throw new IOException("Path " + parentPath + "is not a directory.");
+     }
+
+     int numChildren = in.readInt();
+     for(int i=0; i<numChildren; i++) {
+       // load single inode
+       byte[] localName = new byte[in.readShort()];
+       in.readFully(localName); // read local name
+       INode newNode = loadINode(in); // read rest of inode
+
+       // add to parent
+       namesystem.dir.addToParent(localName, (INodeDirectory)parent, newNode, false);
+     }
+     return numChildren;
+   }
+
   /**
    * load fsimage files assuming full path names are stored
    * 
@@ -485,9 +548,10 @@ class FSImageFormat {
         byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
         ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
         // save the root
-        FSImageSerialization.saveINode2Image(strbuf, fsDir.rootDir, out);
+        FSImageSerialization.saveINode2Image(fsDir.rootDir, out);
         // save the rest of the nodes
-        saveImage(strbuf, 0, fsDir.rootDir, out);
+        saveImage(strbuf, fsDir.rootDir, out);
+        // save files under construction
         sourceNamesystem.saveFilesUnderConstruction(out);
         sourceNamesystem.saveSecretManagerState(out);
         strbuf = null;
@@ -511,28 +575,33 @@ class FSImageFormat {
      * This is a recursive procedure, which first saves all children of
      * a current directory and then moves inside the sub-directories.
      */
-    private static void saveImage(ByteBuffer parentPrefix,
-                                  int prefixLength,
+    private static void saveImage(ByteBuffer currentDirName,
                                   INodeDirectory current,
                                   DataOutputStream out) throws IOException {
-      int newPrefixLength = prefixLength;
-      if (current.getChildrenRaw() == null)
+      List<INode> children = current.getChildrenRaw();
+      if (children == null || children.isEmpty())
         return;
-      for(INode child : current.getChildren()) {
+      // print prefix (parent directory name)
+      int prefixLen = currentDirName.position();
+      if (prefixLen == 0) {  // root
+        out.writeShort(PATH_SEPARATOR.length);
+        out.write(PATH_SEPARATOR);
+      } else {  // non-root directories
+        out.writeShort(prefixLen);
+        out.write(currentDirName.array(), 0, prefixLen);
+      }
+      out.writeInt(children.size());
+      for(INode child : children) {
         // print all children first
-        parentPrefix.position(prefixLength);
-        parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-        FSImageSerialization.saveINode2Image(parentPrefix, child, out);
+        FSImageSerialization.saveINode2Image(child, out);
       }
-      for(INode child : current.getChildren()) {
+      for(INode child : children) {
         if(!child.isDirectory())
           continue;
-        parentPrefix.position(prefixLength);
-        parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-        newPrefixLength = parentPrefix.position();
-        saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out);
+        currentDirName.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
+        saveImage(currentDirName, (INodeDirectory)child, out);
+        currentDirName.position(prefixLen);
       }
-      parentPrefix.position(prefixLength);
     }
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Wed Apr 20 21:00:45 2011
@@ -145,12 +145,11 @@ public class FSImageSerialization {
   /*
    * Save one inode's attributes to the image.
    */
-  static void saveINode2Image(ByteBuffer name,
-                              INode node,
+  static void saveINode2Image(INode node,
                               DataOutputStream out) throws IOException {
-    int nameLen = name.position();
-    out.writeShort(nameLen);
-    out.write(name.array(), name.arrayOffset(), nameLen);
+    byte[] name = node.getLocalNameBytes();
+    out.writeShort(name.length);
+    out.write(name);
     FsPermission filePerm = TL_DATA.get().FILE_PERM;
     if (node.isDirectory()) {
       out.writeShort(0);  // replication

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Apr 20 21:00:45 2011
@@ -259,6 +259,8 @@ public class FSNamesystem implements FSC
   private FsServerDefaults serverDefaults;
   // allow appending to hdfs files
   private boolean supportAppends = true;
+  private DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure = 
+      DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
 
   private volatile SafeModeInfo safeMode;  // safe mode information
   private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
@@ -529,6 +531,8 @@ public class FSNamesystem implements FSC
         + " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
         + " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
         + " min(s)");
+
+    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
   }
 
   /**
@@ -1329,22 +1333,16 @@ public class FSNamesystem implements FSC
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
       AccessControlException, UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
-    writeLock();
-    try {
-    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
-    boolean append = flag.contains(CreateFlag.APPEND);
-    boolean create = flag.contains(CreateFlag.CREATE);
-
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
           + ", holder=" + holder
           + ", clientMachine=" + clientMachine
           + ", createParent=" + createParent
           + ", replication=" + replication
-          + ", overwrite=" + overwrite
-          + ", append=" + append);
+          + ", createFlag=" + flag.toString());
     }
-
+    writeLock();
+    try {
     if (isInSafeMode())
       throw new SafeModeException("Cannot create file" + src, safeMode);
     if (!DFSUtil.isValidName(src)) {
@@ -1354,14 +1352,16 @@ public class FSNamesystem implements FSC
     // Verify that the destination does not exist as a directory already.
     boolean pathExists = dir.exists(src);
     if (pathExists && dir.isDir(src)) {
-      throw new FileAlreadyExistsException("Cannot create file "+ src + "; already exists as a directory.");
+      throw new FileAlreadyExistsException("Cannot create file " + src
+          + "; already exists as a directory.");
     }
 
+    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+    boolean append = flag.contains(CreateFlag.APPEND);
     if (isPermissionEnabled) {
       if (append || (overwrite && pathExists)) {
         checkPathAccess(src, FsAction.WRITE);
-      }
-      else {
+      } else {
         checkAncestorAccess(src, FsAction.WRITE);
       }
     }
@@ -1434,34 +1434,27 @@ public class FSNamesystem implements FSC
       } catch(IOException e) {
         throw new IOException("failed to create "+e.getMessage());
       }
-      if (append) {
-        if (myFile == null) {
-          if(!create)
-            throw new FileNotFoundException("failed to append to non-existent file "
-              + src + " on client " + clientMachine);
-          else {
-            //append & create a nonexist file equals to overwrite
-            return startFileInternal(src, permissions, holder, clientMachine,
-                EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
-          }
-        } else if (myFile.isDirectory()) {
-          throw new IOException("failed to append to directory " + src 
-                                +" on client " + clientMachine);
+      boolean create = flag.contains(CreateFlag.CREATE);
+      if (myFile == null) {
+        if (!create) {
+          throw new FileNotFoundException("failed to overwrite or append to non-existent file "
+            + src + " on client " + clientMachine);
         }
-      } else if (!dir.isValidToCreate(src)) {
+      } else {
+        // File exists - must be one of append or overwrite
         if (overwrite) {
           delete(src, true);
-        } else {
-          throw new IOException("failed to create file " + src 
-                                +" on client " + clientMachine
-                                +" either because the filename is invalid or the file exists");
+        } else if (!append) {
+          throw new FileAlreadyExistsException("failed to create file " + src
+              + " on client " + clientMachine
+              + " because the file exists");
         }
       }
 
       DatanodeDescriptor clientNode = 
         host2DataNodeMap.getDatanodeByHost(clientMachine);
 
-      if (append) {
+      if (append && myFile != null) {
         //
         // Replace current node with a INodeUnderConstruction.
         // Recreate in-memory lease record.
@@ -1663,6 +1656,53 @@ public class FSNamesystem implements FSC
     return b;
   }
 
+  /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
+  LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+      final DatanodeInfo[] existings,  final HashMap<Node, Node> excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws IOException {
+    //check if the feature is enabled
+    dtpReplaceDatanodeOnFailure.checkEnabled();
+
+    final DatanodeDescriptor clientnode;
+    final long preferredblocksize;
+    readLock();
+    try {
+      //check safe mode
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot add datanode; src=" + src
+            + ", blk=" + blk, safeMode);
+      }
+
+      //check lease
+      final INodeFileUnderConstruction file = checkLease(src, clientName);
+      clientnode = file.getClientNode();
+      preferredblocksize = file.getPreferredBlockSize();
+    } finally {
+      readUnlock();
+    }
+
+    //find datanode descriptors
+    final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
+    for(DatanodeInfo d : existings) {
+      final DatanodeDescriptor descriptor = getDatanode(d);
+      if (descriptor != null) {
+        chosen.add(descriptor);
+      }
+    }
+
+    // choose new datanodes.
+    final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
+        src, numAdditionalNodes, clientnode, chosen, true,
+        excludes, preferredblocksize);
+    final LocatedBlock lb = new LocatedBlock(blk, targets);
+    if (isBlockTokenEnabled) {
+      lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(), 
+          EnumSet.of(BlockTokenSecretManager.AccessMode.COPY)));
+    }
+    return lb;
+  }
+
   /**
    * The client would like to let go of the given block
    */
@@ -2691,7 +2731,6 @@ public class FSNamesystem implements FSC
    * Get registrationID for datanodes based on the namespaceID.
    * 
    * @see #registerDatanode(DatanodeRegistration)
-   * @see FSImage#newNamespaceID()
    * @return registration ID
    */
   public String getRegistrationID() {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Apr 20 21:00:45 2011
@@ -233,12 +233,8 @@ abstract class INode implements Comparab
 
 
   String getLocalParentDir() {
-    INode p_node=getParent();
-
-    if(p_node == null)
-      return "/";
-    else
-      return p_node.getFullPathName();
+    INode inode = isRoot() ? this : getParent();
+    return (inode != null) ? inode.getFullPathName() : "";
   }
 
   /**
@@ -271,12 +267,7 @@ abstract class INode implements Comparab
 
   /** {@inheritDoc} */
   public String toString() {
-    String i_path=getFullPathName();
-
-    if(i_path.length() == 0)
-      i_path="/";
-
-    return "\"" + i_path + "\":"
+    return "\"" + getFullPathName() + "\":"
     + getUserName() + ":" + getGroupName() + ":"
     + (isDirectory()? "d": "-") + getFsPermission();
   }
@@ -470,7 +461,9 @@ abstract class INode implements Comparab
                         long nsQuota,
                         long dsQuota,
                         long preferredBlockSize) {
-    if (blocks == null) {
+    if (symlink.length() != 0) { // check if symbolic link
+      return new INodeSymlink(symlink, modificationTime, atime, permissions);
+    }  else if (blocks == null) { //not sym link and blocks null? directory!
       if (nsQuota >= 0 || dsQuota >= 0) {
         return new INodeDirectoryWithQuota(
             permissions, modificationTime, nsQuota, dsQuota);
@@ -478,10 +471,6 @@ abstract class INode implements Comparab
       // regular directory
       return new INodeDirectory(permissions, modificationTime);
     }
-    // check if symbolic link
-    if (symlink.length() != 0) {
-      return new INodeSymlink(symlink, modificationTime, atime, permissions);
-    } 
     // file
     return new INodeFile(permissions, blocks, replication,
         modificationTime, atime, preferredBlockSize);

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Apr 20 21:00:45 2011
@@ -17,13 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -846,6 +845,33 @@ public class NameNode implements Namenod
     return locatedBlock;
   }
 
+  @Override
+  public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getAdditionalDatanode: src=" + src
+          + ", blk=" + blk
+          + ", existings=" + Arrays.asList(existings)
+          + ", excludes=" + Arrays.asList(excludes)
+          + ", numAdditionalNodes=" + numAdditionalNodes
+          + ", clientName=" + clientName);
+    }
+
+    myMetrics.numGetAdditionalDatanodeOps.inc();
+
+    HashMap<Node, Node> excludeSet = null;
+    if (excludes != null) {
+      excludeSet = new HashMap<Node, Node>(excludes.length);
+      for (Node node : excludes) {
+        excludeSet.put(node, node);
+      }
+    }
+    return namesystem.getAdditionalDatanode(src, blk,
+        existings, excludeSet, numAdditionalNodes, clientName);
+  }
+
   /**
    * The client needs to give up on the block.
    */
@@ -1202,7 +1228,7 @@ public class NameNode implements Namenod
     }
     final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     namesystem.createSymlink(target, link,
-      new PermissionStatus(ugi.getUserName(), null, dirPerms), createParent);
+      new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
   }
 
   /** @inheritDoc */

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Wed Apr 20 21:00:45 2011
@@ -72,6 +72,8 @@ public class NameNodeMetrics implements 
                           new MetricsTimeVaryingInt("FileInfoOps", registry);
     public MetricsTimeVaryingInt numAddBlockOps = 
                           new MetricsTimeVaryingInt("AddBlockOps", registry);
+    public final MetricsTimeVaryingInt numGetAdditionalDatanodeOps
+        = new MetricsTimeVaryingInt("GetAdditionalDatanodeOps", registry);
     public MetricsTimeVaryingInt numcreateSymlinkOps = 
                           new MetricsTimeVaryingInt("CreateSymlinkOps", registry);
     public MetricsTimeVaryingInt numgetLinkTargetOps = 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Wed Apr 20 21:00:45 2011
@@ -269,13 +269,21 @@ public class DFSAdmin extends FsShell {
     super(conf);
   }
   
+  protected DistributedFileSystem getDFS() throws IOException {
+    FileSystem fs = getFS();
+    if (!(fs instanceof DistributedFileSystem)) {
+      throw new IllegalArgumentException("FileSystem " + fs.getUri() + 
+      " is not a distributed file system");
+    }
+    return (DistributedFileSystem)fs;
+  }
+  
   /**
    * Gives a report on how the FileSystem is doing.
    * @exception IOException if the filesystem does not exist.
    */
   public void report() throws IOException {
-    if (fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      DistributedFileSystem dfs = getDFS();
       FsStatus ds = dfs.getStatus();
       long capacity = ds.getCapacity();
       long used = ds.getUsed();
@@ -342,7 +350,6 @@ public class DFSAdmin extends FsShell {
           System.out.println();
         }     
       }
-    }
   }
 
   /**
@@ -353,10 +360,6 @@ public class DFSAdmin extends FsShell {
    * @exception IOException if the filesystem does not exist.
    */
   public void setSafeMode(String[] argv, int idx) throws IOException {
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return;
-    }
     if (idx != argv.length - 1) {
       printUsage("-safemode");
       return;
@@ -377,7 +380,7 @@ public class DFSAdmin extends FsShell {
       printUsage("-safemode");
       return;
     }
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     boolean inSafeMode = dfs.setSafeMode(action);
 
     //
@@ -407,12 +410,7 @@ public class DFSAdmin extends FsShell {
   public int saveNamespace() throws IOException {
     int exitCode = -1;
 
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.saveNamespace();
     exitCode = 0;
    
@@ -428,17 +426,12 @@ public class DFSAdmin extends FsShell {
   public int restoreFaileStorage(String arg) throws IOException {
     int exitCode = -1;
 
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
     if(!arg.equals("check") && !arg.equals("true") && !arg.equals("false")) {
       System.err.println("restoreFailedStorage valid args are true|false|check");
       return exitCode;
     }
     
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     Boolean res = dfs.restoreFailedStorage(arg);
     System.out.println("restoreFailedStorage is set to " + res);
     exitCode = 0;
@@ -455,12 +448,7 @@ public class DFSAdmin extends FsShell {
   public int refreshNodes() throws IOException {
     int exitCode = -1;
 
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.refreshNodes();
     exitCode = 0;
    
@@ -636,18 +624,10 @@ public class DFSAdmin extends FsShell {
    * @exception IOException 
    */
   public int finalizeUpgrade() throws IOException {
-    int exitCode = -1;
-
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.out.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.finalizeUpgrade();
-    exitCode = 0;
-   
-    return exitCode;
+    
+    return 0;
   }
 
   /**
@@ -658,10 +638,7 @@ public class DFSAdmin extends FsShell {
    * @exception IOException 
    */
   public int upgradeProgress(String[] argv, int idx) throws IOException {
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.out.println("FileSystem is " + fs.getUri());
-      return -1;
-    }
+    
     if (idx != argv.length - 1) {
       printUsage("-upgradeProgress");
       return -1;
@@ -679,7 +656,7 @@ public class DFSAdmin extends FsShell {
       return -1;
     }
 
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     UpgradeStatusReport status = dfs.distributedUpgradeProgress(action);
     String statusText = (status == null ? 
         "There are no upgrades in progress." :
@@ -698,7 +675,7 @@ public class DFSAdmin extends FsShell {
    */
   public int metaSave(String[] argv, int idx) throws IOException {
     String pathname = argv[idx];
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.metaSave(pathname);
     System.out.println("Created file " + pathname + " on server " +
                        dfs.getUri());
@@ -713,8 +690,7 @@ public class DFSAdmin extends FsShell {
    * @throws IOException If an error while getting datanode report
    */
   public int printTopology() throws IOException {
-    if (fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      DistributedFileSystem dfs = getDFS();
       DFSClient client = dfs.getClient();
       DatanodeInfo[] report = client.datanodeReport(DatanodeReportType.ALL);
       
@@ -749,7 +725,6 @@ public class DFSAdmin extends FsShell {
 
         System.out.println();
       }
-    }
     return 0;
   }
   
@@ -1052,13 +1027,13 @@ public class DFSAdmin extends FsShell {
       } else if ("-metasave".equals(cmd)) {
         exitCode = metaSave(argv, i);
       } else if (ClearQuotaCommand.matches(cmd)) {
-        exitCode = new ClearQuotaCommand(argv, i, fs).runAll();
+        exitCode = new ClearQuotaCommand(argv, i, getDFS()).runAll();
       } else if (SetQuotaCommand.matches(cmd)) {
-        exitCode = new SetQuotaCommand(argv, i, fs).runAll();
+        exitCode = new SetQuotaCommand(argv, i, getDFS()).runAll();
       } else if (ClearSpaceQuotaCommand.matches(cmd)) {
-        exitCode = new ClearSpaceQuotaCommand(argv, i, fs).runAll();
+        exitCode = new ClearSpaceQuotaCommand(argv, i, getDFS()).runAll();
       } else if (SetSpaceQuotaCommand.matches(cmd)) {
-        exitCode = new SetSpaceQuotaCommand(argv, i, fs).runAll();
+        exitCode = new SetSpaceQuotaCommand(argv, i, getDFS()).runAll();
       } else if ("-refreshServiceAcl".equals(cmd)) {
         exitCode = refreshServiceAcl();
       } else if ("-refreshUserToGroupsMappings".equals(cmd)) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java Wed Apr 20 21:00:45 2011
@@ -80,5 +80,6 @@ public enum EditsElement {
     KEY_ID,
     KEY_EXPIRY_DATE,
     KEY_LENGTH,
-    KEY_BLOB
+    KEY_BLOB,
+    CHECKSUM
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java Wed Apr 20 21:00:45 2011
@@ -17,26 +17,15 @@
  */
 package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
 
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.EOFException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
 
 import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ByteToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ShortToken;
 import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.IntToken;
 import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VIntToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.LongToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VLongToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.StringUTF8Token;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.StringTextToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.BlobToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.BytesWritableToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.EmptyToken;
 
 /**
  * EditsLoaderCurrent processes Hadoop EditLogs files and walks over
@@ -49,7 +38,7 @@ import static org.apache.hadoop.hdfs.too
 class EditsLoaderCurrent implements EditsLoader {
 
   private static int [] supportedVersions = {
-    -18, -19, -20, -21, -22, -23, -24, -25, -26, -27 };
+    -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31 };
 
   private EditsVisitor v;
   private int editsVersion = 0;
@@ -464,6 +453,10 @@ class EditsLoaderCurrent implements Edit
         visitOpCode(editsOpCode);
 
         v.leaveEnclosingElement(); // DATA
+        
+        if (editsOpCode != FSEditLogOpCodes.OP_INVALID && editsVersion <= -28) {
+          v.visitInt(EditsElement.CHECKSUM);
+        }
         v.leaveEnclosingElement(); // RECORD
       } while(editsOpCode != FSEditLogOpCodes.OP_INVALID);
 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Wed Apr 20 21:00:45 2011
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.tools.offlineImageViewer.ImageVisitor.ImageElement;
 import org.apache.hadoop.io.Text;
@@ -121,7 +120,7 @@ class ImageLoaderCurrent implements Imag
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
   private static int [] versions = 
-    {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27};
+    {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31};
   private int imageVersion = 0;
 
   /* (non-Javadoc)
@@ -334,34 +333,105 @@ class ImageLoaderCurrent implements Imag
       long numInodes, boolean skipBlocks) throws IOException {
     v.visitEnclosingElement(ImageElement.INODES,
         ImageElement.NUM_INODES, numInodes);
+    
+    if (imageVersion <= -30) { // local file name
+      processLocalNameINodes(in, v, numInodes, skipBlocks);
+    } else { // full path name
+      processFullNameINodes(in, v, numInodes, skipBlocks);
+    }
 
-    for(long i = 0; i < numInodes; i++) {
-      v.visitEnclosingElement(ImageElement.INODE);
-      v.visit(ImageElement.INODE_PATH, FSImageSerialization.readString(in));
-      v.visit(ImageElement.REPLICATION, in.readShort());
-      v.visit(ImageElement.MODIFICATION_TIME, formatDate(in.readLong()));
-      if(imageVersion <= -17) // added in version -17
-        v.visit(ImageElement.ACCESS_TIME, formatDate(in.readLong()));
-      v.visit(ImageElement.BLOCK_SIZE, in.readLong());
-      int numBlocks = in.readInt();
-
-      processBlocks(in, v, numBlocks, skipBlocks);
-
-      // File or directory
-      if (numBlocks > 0 || numBlocks == -1) {
-        v.visit(ImageElement.NS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
-        if(imageVersion <= -18) // added in version -18
-          v.visit(ImageElement.DS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
-      }
-      if (imageVersion <= -23 && numBlocks == -2) {
-        v.visit(ImageElement.SYMLINK, Text.readString(in));
+    
+    v.leaveEnclosingElement(); // INodes
+  }
+  
+  /**
+   * Process image with full path name
+   * 
+   * @param in image stream
+   * @param v visitor
+   * @param numInodes number of indoes to read
+   * @param skipBlocks skip blocks or not
+   * @throws IOException if there is any error occurs
+   */
+  private void processLocalNameINodes(DataInputStream in, ImageVisitor v,
+      long numInodes, boolean skipBlocks) throws IOException {
+    // process root
+    processINode(in, v, skipBlocks, "");
+    numInodes--;
+    while (numInodes > 0) {
+      numInodes -= processDirectory(in, v, skipBlocks);
+    }
+  }
+  
+  private int processDirectory(DataInputStream in, ImageVisitor v,
+     boolean skipBlocks) throws IOException {
+    String parentName = FSImageSerialization.readString(in);
+    int numChildren = in.readInt();
+    for (int i=0; i<numChildren; i++) {
+      processINode(in, v, skipBlocks, parentName);
+    }
+    return numChildren;
+  }
+  
+   /**
+    * Process image with full path name
+    * 
+    * @param in image stream
+    * @param v visitor
+    * @param numInodes number of indoes to read
+    * @param skipBlocks skip blocks or not
+    * @throws IOException if there is any error occurs
+    */
+   private void processFullNameINodes(DataInputStream in, ImageVisitor v,
+       long numInodes, boolean skipBlocks) throws IOException {
+     for(long i = 0; i < numInodes; i++) {
+       processINode(in, v, skipBlocks, null);
+     }
+   }
+   
+   /**
+    * Process an INode
+    * 
+    * @param in image stream
+    * @param v visitor
+    * @param skipBlocks skip blocks or not
+    * @param parentName the name of its parent node
+    * @return the number of Children
+    * @throws IOException
+    */
+  private void processINode(DataInputStream in, ImageVisitor v,
+      boolean skipBlocks, String parentName) throws IOException {
+    v.visitEnclosingElement(ImageElement.INODE);
+    String pathName = FSImageSerialization.readString(in);
+    if (parentName != null) {  // local name
+      pathName = "/" + pathName;
+      if (!"/".equals(parentName)) { // children of non-root directory
+        pathName = parentName + pathName;
       }
+    }
 
-      processPermission(in, v);
-      v.leaveEnclosingElement(); // INode
+    v.visit(ImageElement.INODE_PATH, pathName);
+    v.visit(ImageElement.REPLICATION, in.readShort());
+    v.visit(ImageElement.MODIFICATION_TIME, formatDate(in.readLong()));
+    if(imageVersion <= -17) // added in version -17
+      v.visit(ImageElement.ACCESS_TIME, formatDate(in.readLong()));
+    v.visit(ImageElement.BLOCK_SIZE, in.readLong());
+    int numBlocks = in.readInt();
+
+    processBlocks(in, v, numBlocks, skipBlocks);
+
+    // File or directory
+    if (numBlocks > 0 || numBlocks == -1) {
+      v.visit(ImageElement.NS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
+      if(imageVersion <= -18) // added in version -18
+        v.visit(ImageElement.DS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
     }
-    
-    v.leaveEnclosingElement(); // INodes
+    if (imageVersion <= -23 && numBlocks == -2) {
+      v.visit(ImageElement.SYMLINK, Text.readString(in));
+    }
+
+    processPermission(in, v);
+    v.leaveEnclosingElement(); // INode
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java Wed Apr 20 21:00:45 2011
@@ -343,7 +343,7 @@ public class DataTransferTestUtil {
       if (!test.isSuccess() && p.contains(index, id)) {
         FiTestUtil.LOG.info(toString(id));
         if (maxDuration <= 0) {
-          for(; true; FiTestUtil.sleep(1000)); //sleep forever
+          for(; FiTestUtil.sleep(1000); ); //sleep forever until interrupt
         } else {
           FiTestUtil.sleep(minDuration, maxDuration);
         }
@@ -391,7 +391,7 @@ public class DataTransferTestUtil {
         + minDuration + "," + maxDuration + ")";
         FiTestUtil.LOG.info(s);
         if (maxDuration <= 1) {
-          for(; true; FiTestUtil.sleep(1000)); //sleep forever
+          for(; FiTestUtil.sleep(1000); ); //sleep forever until interrupt
         } else {
           FiTestUtil.sleep(minDuration, maxDuration);
         }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java Wed Apr 20 21:00:45 2011
@@ -73,14 +73,17 @@ public class FiTestUtil {
 
   /**
    * Sleep.
-   * If there is an InterruptedException, re-throw it as a RuntimeException.
+   * @return true if sleep exits normally; false if InterruptedException.
    */
-  public static void sleep(long ms) {
+  public static boolean sleep(long ms) {
+    LOG.info("Sleep " + ms + " ms");
     try {
       Thread.sleep(ms);
     } catch (InterruptedException e) {
-      throw new RuntimeException(e);
+      LOG.info("Sleep is interrupted", e);
+      return false;
     }
+    return true;
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Wed Apr 20 21:00:45 2011
@@ -45,6 +45,11 @@ import org.apache.hadoop.util.DiskChecke
 privileged public aspect BlockReceiverAspects {
   public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
 
+  BlockReceiver BlockReceiver.PacketResponder.getReceiver(){
+    LOG.info("FI: getReceiver() " + getClass().getName());
+    return BlockReceiver.this;
+  }
+
   pointcut callReceivePacket(BlockReceiver blockreceiver) :
     call(* receivePacket(..)) && target(blockreceiver);
 	
@@ -82,7 +87,7 @@ privileged public aspect BlockReceiverAs
 
   after(BlockReceiver.PacketResponder responder)
       throws IOException: afterDownstreamStatusRead(responder) {
-    final DataNode d = responder.receiver.getDataNode();
+    final DataNode d = responder.getReceiver().getDataNode();
     DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
     if (dtTest != null)
       dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeId());

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java Wed Apr 20 21:00:45 2011
@@ -22,18 +22,13 @@ import java.io.IOException;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
-import org.apache.hadoop.fi.FiTestUtil;
-import org.apache.hadoop.fi.DataTransferTestUtil.DataNodeAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
-import org.apache.hadoop.fi.DataTransferTestUtil.DatanodeMarkingAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DoosAction;
-import org.apache.hadoop.fi.DataTransferTestUtil.IoeAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fi.FiTestUtil.Action;
-import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction;
-import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -41,7 +36,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -63,6 +60,10 @@ public class TestFiDataTransferProtocol 
         REPLICATION, BLOCKSIZE);
   }
 
+  {
+    ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   /**
    * 1. create files with dfs
    * 2. write 1 byte
@@ -70,9 +71,9 @@ public class TestFiDataTransferProtocol 
    * 4. open the same file
    * 5. read the 1 byte and compare results
    */
-  private static void write1byte(String methodName) throws IOException {
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(REPLICATION).format(true).build();
+  static void write1byte(String methodName) throws IOException {
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
+        ).numDataNodes(REPLICATION + 1).build();
     final FileSystem dfs = cluster.getFileSystem();
     try {
       final Path p = new Path("/" + methodName + "/foo");
@@ -305,184 +306,4 @@ public class TestFiDataTransferProtocol 
     final String methodName = FiTestUtil.getMethodName();
     runCallWritePacketToDisk(methodName, 2, new DoosAction(methodName, 2));
   }
-
-  private static void runPipelineCloseTest(String methodName,
-      Action<DatanodeID, IOException> a) throws IOException {
-    FiTestUtil.LOG.info("Running " + methodName + " ...");
-    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
-        .initTest();
-    t.fiPipelineClose.set(a);
-    write1byte(methodName);
-  }
-
-  private static void run41_43(String name, int i) throws IOException {
-    runPipelineCloseTest(name, new SleepAction(name, i, 3000));
-  }
-
-  private static void runPipelineCloseAck(String name, int i, DataNodeAction a
-      ) throws IOException {
-    FiTestUtil.LOG.info("Running " + name + " ...");
-    final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
-    final MarkerConstraint marker = new MarkerConstraint(name);
-    t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
-    t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
-    write1byte(name);
-  }
-
-  private static void run39_40(String name, int i) throws IOException {
-    runPipelineCloseAck(name, i, new SleepAction(name, i, 0));
-  }
-
-  /**
-   * Pipeline close:
-   * DN1 never responses after received close ack DN2.
-   * Client gets an IOException and determine DN1 bad.
-   */
-  @Test
-  public void pipeline_Fi_39() throws IOException {
-    run39_40(FiTestUtil.getMethodName(), 1);
-  }
-
-  /**
-   * Pipeline close:
-   * DN0 never responses after received close ack DN1.
-   * Client gets an IOException and determine DN0 bad.
-   */
-  @Test
-  public void pipeline_Fi_40() throws IOException {
-    run39_40(FiTestUtil.getMethodName(), 0);
-  }
-  
-  /**
-   * Pipeline close with DN0 very slow but it won't lead to timeout.
-   * Client finishes close successfully.
-   */
-  @Test
-  public void pipeline_Fi_41() throws IOException {
-    run41_43(FiTestUtil.getMethodName(), 0);
-  }
-
-  /**
-   * Pipeline close with DN1 very slow but it won't lead to timeout.
-   * Client finishes close successfully.
-   */
-  @Test
-  public void pipeline_Fi_42() throws IOException {
-    run41_43(FiTestUtil.getMethodName(), 1);
-  }
-
-  /**
-   * Pipeline close with DN2 very slow but it won't lead to timeout.
-   * Client finishes close successfully.
-   */
-  @Test
-  public void pipeline_Fi_43() throws IOException {
-    run41_43(FiTestUtil.getMethodName(), 2);
-  }
-
-  /**
-   * Pipeline close:
-   * DN0 throws an OutOfMemoryException
-   * right after it received a close request from client.
-   * Client gets an IOException and determine DN0 bad.
-   */
-  @Test
-  public void pipeline_Fi_44() throws IOException {
-    final String methodName = FiTestUtil.getMethodName();
-    runPipelineCloseTest(methodName, new OomAction(methodName, 0));
-  }
-
-  /**
-   * Pipeline close:
-   * DN1 throws an OutOfMemoryException
-   * right after it received a close request from client.
-   * Client gets an IOException and determine DN1 bad.
-   */
-  @Test
-  public void pipeline_Fi_45() throws IOException {
-    final String methodName = FiTestUtil.getMethodName();
-    runPipelineCloseTest(methodName, new OomAction(methodName, 1));
-  }
-
-  /**
-   * Pipeline close:
-   * DN2 throws an OutOfMemoryException
-   * right after it received a close request from client.
-   * Client gets an IOException and determine DN2 bad.
-   */
-  @Test
-  public void pipeline_Fi_46() throws IOException {
-    final String methodName = FiTestUtil.getMethodName();
-    runPipelineCloseTest(methodName, new OomAction(methodName, 2));
-  }
-
-  private static void run47_48(String name, int i) throws IOException {
-    runPipelineCloseAck(name, i, new OomAction(name, i));
-  }
-
-  /**
-   * Pipeline close:
-   * DN1 throws an OutOfMemoryException right after
-   * it received a close ack from DN2.
-   * Client gets an IOException and determine DN1 bad.
-   */
-  @Test
-  public void pipeline_Fi_47() throws IOException {
-    run47_48(FiTestUtil.getMethodName(), 1);
-  }
-
-  /**
-   * Pipeline close:
-   * DN0 throws an OutOfMemoryException right after
-   * it received a close ack from DN1.
-   * Client gets an IOException and determine DN0 bad.
-   */
-  @Test
-  public void pipeline_Fi_48() throws IOException {
-    run47_48(FiTestUtil.getMethodName(), 0);
-  }
-
-  private static void runBlockFileCloseTest(String methodName,
-      Action<DatanodeID, IOException> a) throws IOException {
-    FiTestUtil.LOG.info("Running " + methodName + " ...");
-    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
-        .initTest();
-    t.fiBlockFileClose.set(a);
-    write1byte(methodName);
-  }
-
-  private static void run49_51(String name, int i) throws IOException {
-    runBlockFileCloseTest(name, new IoeAction(name, i, "DISK ERROR"));
-  }
-
-  /**
-   * Pipeline close:
-   * DN0 throws a disk error exception when it is closing the block file.
-   * Client gets an IOException and determine DN0 bad.
-   */
-  @Test
-  public void pipeline_Fi_49() throws IOException {
-    run49_51(FiTestUtil.getMethodName(), 0);
-  }
-
-
-  /**
-   * Pipeline close:
-   * DN1 throws a disk error exception when it is closing the block file.
-   * Client gets an IOException and determine DN1 bad.
-   */
-  @Test
-  public void pipeline_Fi_50() throws IOException {
-    run49_51(FiTestUtil.getMethodName(), 1);
-  }
-
-  /**
-   * Pipeline close:
-   * DN2 throws a disk error exception when it is closing the block file.
-   * Client gets an IOException and determine DN2 bad.
-   */
-  @Test
-  public void pipeline_Fi_51() throws IOException {
-    run49_51(FiTestUtil.getMethodName(), 2);
-  }
-}
\ No newline at end of file
+}

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java Wed Apr 20 21:00:45 2011
@@ -23,13 +23,13 @@ import java.util.Random;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
-import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,9 +37,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.log4j.Level;
-
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -71,6 +70,7 @@ public class TestFiDataTransferProtocol2
   {
     ((Log4JLogger) BlockReceiver.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
   }
   /**
    * 1. create files with dfs
@@ -88,8 +88,8 @@ public class TestFiDataTransferProtocol2
     FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets
         + ", lastPacketSize=" + lastPacketSize);
 
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(REPLICATION).build();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
+        ).numDataNodes(REPLICATION + 1).build();
     final FileSystem dfs = cluster.getFileSystem();
     try {
       final Path p = new Path("/" + methodName + "/foo");

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java Wed Apr 20 21:00:45 2011
@@ -19,76 +19,29 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
-import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataNodeAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.DatanodeMarkingAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.IoeAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fi.FiTestUtil.Action;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction;
+import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.junit.Assert;
 import org.junit.Test;
 
 /** Test DataTransferProtocol with fault injection. */
 public class TestFiPipelineClose {
-  static final short REPLICATION = 3;
-  static final long BLOCKSIZE = 1L * (1L << 20);
-
-  static final Configuration conf = new HdfsConfiguration();
-  static {
-    conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1);
-    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION);
-    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
-  }
-
-  static private FSDataOutputStream createFile(FileSystem fs, Path p
-      ) throws IOException {
-    return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
-        REPLICATION, BLOCKSIZE);
-  }
-
-  /**
-   * 1. create files with dfs
-   * 2. write 1 byte
-   * 3. close file
-   * 4. open the same file
-   * 5. read the 1 byte and compare results
-   */
-  private static void write1byte(String methodName) throws IOException {
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(REPLICATION).format(true).build();
-    final FileSystem dfs = cluster.getFileSystem();
-    try {
-      final Path p = new Path("/" + methodName + "/foo");
-      final FSDataOutputStream out = createFile(dfs, p);
-      out.write(1);
-      out.close();
-      
-      final FSDataInputStream in = dfs.open(p);
-      final int b = in.read();
-      in.close();
-      Assert.assertEquals(1, b);
-    }
-    finally {
-      dfs.close();
-      cluster.shutdown();
-    }
-  }
-
-   private static void runPipelineCloseTest(String methodName,
+  private static void runPipelineCloseTest(String methodName,
       Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
     t.fiPipelineClose.set(a);
-    write1byte(methodName);
+    TestFiDataTransferProtocol.write1byte(methodName);
   }
 
   /**
@@ -123,4 +76,175 @@ public class TestFiPipelineClose {
     final String methodName = FiTestUtil.getMethodName();
     runPipelineCloseTest(methodName, new SleepAction(methodName, 2, 0));
   }
+
+  private static void run41_43(String name, int i) throws IOException {
+    runPipelineCloseTest(name, new SleepAction(name, i, 3000));
+  }
+
+  private static void runPipelineCloseAck(String name, int i, DataNodeAction a
+      ) throws IOException {
+    FiTestUtil.LOG.info("Running " + name + " ...");
+    final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
+    final MarkerConstraint marker = new MarkerConstraint(name);
+    t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
+    t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
+    TestFiDataTransferProtocol.write1byte(name);
+  }
+
+  private static void run39_40(String name, int i) throws IOException {
+    runPipelineCloseAck(name, i, new SleepAction(name, i, 0));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 never responses after received close ack DN2.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_39() throws IOException {
+    run39_40(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 never responses after received close ack DN1.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_40() throws IOException {
+    run39_40(FiTestUtil.getMethodName(), 0);
+  }
+  
+  /**
+   * Pipeline close with DN0 very slow but it won't lead to timeout.
+   * Client finishes close successfully.
+   */
+  @Test
+  public void pipeline_Fi_41() throws IOException {
+    run41_43(FiTestUtil.getMethodName(), 0);
+  }
+
+  /**
+   * Pipeline close with DN1 very slow but it won't lead to timeout.
+   * Client finishes close successfully.
+   */
+  @Test
+  public void pipeline_Fi_42() throws IOException {
+    run41_43(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close with DN2 very slow but it won't lead to timeout.
+   * Client finishes close successfully.
+   */
+  @Test
+  public void pipeline_Fi_43() throws IOException {
+    run41_43(FiTestUtil.getMethodName(), 2);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 throws an OutOfMemoryException
+   * right after it received a close request from client.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_44() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new OomAction(methodName, 0));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 throws an OutOfMemoryException
+   * right after it received a close request from client.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_45() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new OomAction(methodName, 1));
+  }
+
+  /**
+   * Pipeline close:
+   * DN2 throws an OutOfMemoryException
+   * right after it received a close request from client.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_46() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new OomAction(methodName, 2));
+  }
+
+  private static void run47_48(String name, int i) throws IOException {
+    runPipelineCloseAck(name, i, new OomAction(name, i));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 throws an OutOfMemoryException right after
+   * it received a close ack from DN2.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_47() throws IOException {
+    run47_48(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 throws an OutOfMemoryException right after
+   * it received a close ack from DN1.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_48() throws IOException {
+    run47_48(FiTestUtil.getMethodName(), 0);
+  }
+
+  private static void runBlockFileCloseTest(String methodName,
+      Action<DatanodeID, IOException> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    t.fiBlockFileClose.set(a);
+    TestFiDataTransferProtocol.write1byte(methodName);
+  }
+
+  private static void run49_51(String name, int i) throws IOException {
+    runBlockFileCloseTest(name, new IoeAction(name, i, "DISK ERROR"));
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 throws a disk error exception when it is closing the block file.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_49() throws IOException {
+    run49_51(FiTestUtil.getMethodName(), 0);
+  }
+
+
+  /**
+   * Pipeline close:
+   * DN1 throws a disk error exception when it is closing the block file.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_50() throws IOException {
+    run49_51(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close:
+   * DN2 throws a disk error exception when it is closing the block file.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_51() throws IOException {
+    run49_51(FiTestUtil.getMethodName(), 2);
+  }
 }

Propchange: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 21:00:45 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/test/hdfs:776175-785643
 /hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/test/hdfs:820487
-/hadoop/hdfs/trunk/src/test/hdfs:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696
+/hadoop/hdfs/trunk/src/test/hdfs:987665-1028906,1032470-1033639,1034073,1034082-1034181,1034501-1034544,1034932,1035141,1035143,1035145,1035163,1035386,1035410,1035508,1035515,1035552,1035718,1035795,1035841-1035842,1035890,1035920,1035924,1036132,1036213,1036303,1036310,1036631,1036692,1036767,1038222-1038859,1039957,1040005,1040027-1060619,1061067,1062011-1065960,1066305,1066970,1066986,1067079-1076024,1076696,1078925,1078940,1079069-1080836,1081580-1083021,1083043,1083084,1083234,1083902,1083951,1083958,1085460,1085509,1086454-1086461,1086479,1086654,1086693,1086820,1087080,1087096,1087115,1087160-1089696,1090114-1095461

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java?rev=1095512&r1=1095511&r2=1095512&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java Wed Apr 20 21:00:45 2011
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.cli;
 
-import org.apache.hadoop.cli.util.CLITestData.TestCmd;
+import org.apache.hadoop.cli.util.CLICommand;
 import org.apache.hadoop.cli.util.CommandExecutor.Result;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -31,7 +31,7 @@ import static org.junit.Assert.assertTru
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestHDFSCLI extends CLITestHelper {
+public class TestHDFSCLI extends CLITestHelperDFS {
 
   protected MiniDFSCluster dfsCluster = null;
   protected DistributedFileSystem dfs = null;
@@ -85,13 +85,13 @@ public class TestHDFSCLI extends CLITest
   protected String expandCommand(final String cmd) {
     String expCmd = cmd;
     expCmd = expCmd.replaceAll("NAMENODE", namenode);
-    expCmd = super.expandCommand(cmd);
+    expCmd = super.expandCommand(expCmd);
     return expCmd;
   }
   
   @Override
-  protected Result execute(TestCmd cmd) throws Exception {
-    return CmdFactoryDFS.getCommandExecutor(cmd, namenode).executeCommand(cmd.getCmd());
+  protected Result execute(CLICommand cmd) throws Exception {
+    return cmd.getExecutor(namenode).executeCommand(cmd.getCmd());
   }
 
   @Test