You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by tu...@apache.org on 2012/12/07 02:54:02 UTC

svn commit: r1418161 [2/3] - in /hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/...

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Dec  7 01:53:35 2012
@@ -34,6 +34,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
@@ -111,6 +113,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Options;
@@ -121,6 +124,7 @@ import org.apache.hadoop.fs.UnresolvedLi
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -163,12 +167,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
-import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
@@ -246,32 +248,32 @@ public class FSNamesystem implements Nam
       }
   };
 
-  private static final void logAuditEvent(UserGroupInformation ugi,
+  private boolean isAuditEnabled() {
+    return !isDefaultAuditLogger || auditLog.isInfoEnabled();
+  }
+
+  private void logAuditEvent(UserGroupInformation ugi,
       InetAddress addr, String cmd, String src, String dst,
       HdfsFileStatus stat) {
     logAuditEvent(true, ugi, addr, cmd, src, dst, stat);
   }
 
-  private static final void logAuditEvent(boolean succeeded,
+  private void logAuditEvent(boolean succeeded,
       UserGroupInformation ugi, InetAddress addr, String cmd, String src,
       String dst, HdfsFileStatus stat) {
-    final StringBuilder sb = auditBuffer.get();
-    sb.setLength(0);
-    sb.append("allowed=").append(succeeded).append("\t");
-    sb.append("ugi=").append(ugi).append("\t");
-    sb.append("ip=").append(addr).append("\t");
-    sb.append("cmd=").append(cmd).append("\t");
-    sb.append("src=").append(src).append("\t");
-    sb.append("dst=").append(dst).append("\t");
-    if (null == stat) {
-      sb.append("perm=null");
-    } else {
-      sb.append("perm=");
-      sb.append(stat.getOwner()).append(":");
-      sb.append(stat.getGroup()).append(":");
-      sb.append(stat.getPermission());
+    FileStatus status = null;
+    if (stat != null) {
+      Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
+      Path path = dst != null ? new Path(dst) : new Path(src);
+      status = new FileStatus(stat.getLen(), stat.isDir(),
+          stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
+          stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
+          stat.getGroup(), symlink, path);
+    }
+    for (AuditLogger logger : auditLoggers) {
+      logger.logAuditEvent(succeeded, ugi.toString(), addr,
+          cmd, src, dst, status);
     }
-    auditLog.info(sb);
   }
 
   /**
@@ -304,6 +306,11 @@ public class FSNamesystem implements Nam
   final DelegationTokenSecretManager dtSecretManager;
   private final boolean alwaysUseDelegationTokensForTests;
   
+  // Tracks whether the default audit logger is the only configured audit
+  // logger; this allows isAuditEnabled() to return false in case the
+  // underlying logger is disabled, and avoid some unnecessary work.
+  private final boolean isDefaultAuditLogger;
+  private final List<AuditLogger> auditLoggers;
 
   /** The namespace tree. */
   FSDirectory dir;
@@ -536,12 +543,48 @@ public class FSNamesystem implements Nam
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
       this.dir = new FSDirectory(fsImage, this, conf);
       this.safeMode = new SafeModeInfo(conf);
-
+      this.auditLoggers = initAuditLoggers(conf);
+      this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
+        auditLoggers.get(0) instanceof DefaultAuditLogger;
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
       throw e;
+    } catch (RuntimeException re) {
+      LOG.error(getClass().getSimpleName() + " initialization failed.", re);
+      close();
+      throw re;
+    }
+  }
+
+  private List<AuditLogger> initAuditLoggers(Configuration conf) {
+    // Initialize the custom access loggers if configured.
+    Collection<String> alClasses = conf.getStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY);
+    List<AuditLogger> auditLoggers = Lists.newArrayList();
+    if (alClasses != null && !alClasses.isEmpty()) {
+      for (String className : alClasses) {
+        try {
+          AuditLogger logger;
+          if (DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME.equals(className)) {
+            logger = new DefaultAuditLogger();
+          } else {
+            logger = (AuditLogger) Class.forName(className).newInstance();
+          }
+          logger.initialize(conf);
+          auditLoggers.add(logger);
+        } catch (RuntimeException re) {
+          throw re;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    // Make sure there is at least one logger installed.
+    if (auditLoggers.isEmpty()) {
+      auditLoggers.add(new DefaultAuditLogger());
     }
+    return auditLoggers;
   }
 
   void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled)
@@ -1003,8 +1046,8 @@ public class FSNamesystem implements Nam
       // start in active.
       return haEnabled;
     }
-  
-    return haContext.getState() instanceof StandbyState;
+
+    return HAServiceState.STANDBY == haContext.getState().getServiceState();
   }
 
   /**
@@ -1030,7 +1073,8 @@ public class FSNamesystem implements Nam
     long totalInodes = this.dir.totalInodes();
     long totalBlocks = this.getBlocksTotal();
     out.println(totalInodes + " files and directories, " + totalBlocks
-        + " blocks = " + (totalInodes + totalBlocks) + " total");
+        + " blocks = " + (totalInodes + totalBlocks)
+        + " total filesystem objects");
 
     blockManager.metaSave(out);
   }
@@ -1076,7 +1120,7 @@ public class FSNamesystem implements Nam
     try {
       setPermissionInt(src, permission);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "setPermission", src, null, null);
@@ -1098,14 +1142,14 @@ public class FSNamesystem implements Nam
       }
       checkOwner(src);
       dir.setPermission(src, permission);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(src, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "setPermission", src, null, resultingStat);
@@ -1122,7 +1166,7 @@ public class FSNamesystem implements Nam
     try {
       setOwnerInt(src, username, group);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "setOwner", src, null, null);
@@ -1153,14 +1197,14 @@ public class FSNamesystem implements Nam
         }
       }
       dir.setOwner(src, username, group);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(src, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "setOwner", src, null, resultingStat);
@@ -1203,7 +1247,7 @@ public class FSNamesystem implements Nam
       return getBlockLocationsInt(src, offset, length, doAccessTime,
                                   needBlockToken, checkSafeMode);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "open", src, null, null);
@@ -1229,7 +1273,7 @@ public class FSNamesystem implements Nam
     }
     final LocatedBlocks ret = getBlockLocationsUpdateTimes(src,
         offset, length, doAccessTime, needBlockToken);  
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "open", src, null, null);
@@ -1310,7 +1354,7 @@ public class FSNamesystem implements Nam
     try {
       concatInt(target, srcs);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getLoginUser(),
                       getRemoteIp(),
                       "concat", Arrays.toString(srcs), target, null);
@@ -1353,14 +1397,14 @@ public class FSNamesystem implements Nam
         throw new SafeModeException("Cannot concat " + target, safeMode);
       }
       concatInternal(target, srcs);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(target, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getLoginUser(),
                     getRemoteIp(),
                     "concat", Arrays.toString(srcs), target, resultingStat);
@@ -1481,7 +1525,7 @@ public class FSNamesystem implements Nam
     try {
       setTimesInt(src, mtime, atime);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "setTimes", src, null, null);
@@ -1507,7 +1551,7 @@ public class FSNamesystem implements Nam
       INode inode = dir.getINode(src);
       if (inode != null) {
         dir.setTimes(src, inode, mtime, atime, true);
-        if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        if (isAuditEnabled() && isExternalInvocation()) {
           final HdfsFileStatus stat = dir.getFileInfo(src, false);
           logAuditEvent(UserGroupInformation.getCurrentUser(),
                         getRemoteIp(),
@@ -1530,7 +1574,7 @@ public class FSNamesystem implements Nam
     try {
       createSymlinkInt(target, link, dirPerms, createParent);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "createSymlink", link, target, null);
@@ -1551,14 +1595,14 @@ public class FSNamesystem implements Nam
         verifyParentDir(link);
       }
       createSymlinkInternal(target, link, dirPerms, createParent);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(link, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "createSymlink", link, target, resultingStat);
@@ -1614,7 +1658,7 @@ public class FSNamesystem implements Nam
     try {
       return setReplicationInt(src, replication);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "setReplication", src, null, null);
@@ -1650,7 +1694,7 @@ public class FSNamesystem implements Nam
     }
 
     getEditLog().logSync();
-    if (isFile && auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isFile && isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "setReplication", src, null, null);
@@ -1706,7 +1750,7 @@ public class FSNamesystem implements Nam
       startFileInt(src, permissions, holder, clientMachine, flag, createParent,
                    replication, blockSize);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "create", src, null, null);
@@ -1739,7 +1783,7 @@ public class FSNamesystem implements Nam
       }
     } 
 
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
@@ -2040,7 +2084,7 @@ public class FSNamesystem implements Nam
     try {
       return appendFileInt(src, holder, clientMachine);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "append", src, null, null);
@@ -2086,7 +2130,7 @@ public class FSNamesystem implements Nam
             +" block size " + lb.getBlock().getNumBytes());
       }
     }
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "append", src, null, null);
@@ -2532,7 +2576,7 @@ public class FSNamesystem implements Nam
     try {
       return renameToInt(src, dst);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "rename", src, dst, null);
@@ -2554,14 +2598,14 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
 
       status = renameToInternal(src, dst);
-      if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (status && isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false);
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (status && isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "rename", src, dst, resultingStat);
@@ -2583,15 +2627,15 @@ public class FSNamesystem implements Nam
     if (isPermissionEnabled) {
       //We should not be doing this.  This is move() not renameTo().
       //but for now,
+      //NOTE: yes, this is bad!  it's assuming much lower level behavior
+      //      of rewriting the dst
       String actualdst = dir.isDir(dst)?
           dst + Path.SEPARATOR + new Path(src).getName(): dst;
       checkParentAccess(src, FsAction.WRITE);
       checkAncestorAccess(actualdst, FsAction.WRITE);
     }
 
-    HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     if (dir.renameTo(src, dst)) {
-      unprotectedChangeLease(src, dst, dinfo);     // update lease with new filename
       return true;
     }
     return false;
@@ -2611,14 +2655,14 @@ public class FSNamesystem implements Nam
       checkOperation(OperationCategory.WRITE);
 
       renameToInternal(src, dst, options);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         resultingStat = dir.getFileInfo(dst, false); 
       }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       StringBuilder cmd = new StringBuilder("rename options=");
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
@@ -2642,9 +2686,7 @@ public class FSNamesystem implements Nam
       checkAncestorAccess(dst, FsAction.WRITE);
     }
 
-    HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     dir.renameTo(src, dst, options);
-    unprotectedChangeLease(src, dst, dinfo); // update lease with new filename
   }
   
   /**
@@ -2659,7 +2701,7 @@ public class FSNamesystem implements Nam
     try {
       return deleteInt(src, recursive);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "delete", src, null, null);
@@ -2675,7 +2717,7 @@ public class FSNamesystem implements Nam
       NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
     }
     boolean status = deleteInternal(src, recursive, true);
-    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (status && isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "delete", src, null, null);
@@ -2841,7 +2883,7 @@ public class FSNamesystem implements Nam
       }
       stat = dir.getFileInfo(src, resolveLink);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "getfileinfo", src, null, null);
@@ -2850,7 +2892,7 @@ public class FSNamesystem implements Nam
     } finally {
       readUnlock();
     }
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (isAuditEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
                     "getfileinfo", src, null, null);
@@ -2866,7 +2908,7 @@ public class FSNamesystem implements Nam
     try {
       return mkdirsInt(src, permissions, createParent);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "mkdirs", src, null, null);
@@ -2890,7 +2932,7 @@ public class FSNamesystem implements Nam
       writeUnlock();
     }
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+    if (status && isAuditEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     getRemoteIp(),
@@ -2979,9 +3021,11 @@ public class FSNamesystem implements Nam
   /** Persist all metadata about this file.
    * @param src The string representation of the path
    * @param clientName The string representation of the client
+   * @param lastBlockLength The length of the last block 
+   *                        under construction reported from client.
    * @throws IOException if path does not exist
    */
-  void fsync(String src, String clientName) 
+  void fsync(String src, String clientName, long lastBlockLength) 
       throws IOException, UnresolvedLinkException {
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
     writeLock();
@@ -2991,6 +3035,9 @@ public class FSNamesystem implements Nam
         throw new SafeModeException("Cannot fsync file " + src, safeMode);
       }
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
+      if (lastBlockLength > 0) {
+        pendingFile.updateLengthOfLastBlock(lastBlockLength);
+      }
       dir.persistBlocks(src, pendingFile);
     } finally {
       writeUnlock();
@@ -3319,7 +3366,7 @@ public class FSNamesystem implements Nam
     try {
       return getListingInt(src, startAfter, needLocation);
     } catch (AccessControlException e) {
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "listStatus", src, null, null);
@@ -3343,7 +3390,7 @@ public class FSNamesystem implements Nam
           checkTraverse(src);
         }
       }
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       getRemoteIp(),
                       "listStatus", src, null, null);
@@ -3433,15 +3480,7 @@ public class FSNamesystem implements Nam
 
   private NNHAStatusHeartbeat createHaStatusHeartbeat() {
     HAState state = haContext.getState();
-    NNHAStatusHeartbeat.State hbState;
-    if (state instanceof ActiveState) {
-      hbState = NNHAStatusHeartbeat.State.ACTIVE;
-    } else if (state instanceof StandbyState) {
-      hbState = NNHAStatusHeartbeat.State.STANDBY;      
-    } else {
-      throw new AssertionError("Invalid state: " + state.getClass());
-    }
-    return new NNHAStatusHeartbeat(hbState,
+    return new NNHAStatusHeartbeat(state.getServiceState(),
         getFSImage().getLastAppliedOrWrittenTxId());
   }
 
@@ -3870,7 +3909,7 @@ public class FSNamesystem implements Nam
     private synchronized void leave() {
       // if not done yet, initialize replication queues.
       // In the standby, do not populate repl queues
-      if (!isPopulatingReplQueues() && !isInStandbyState()) {
+      if (!isPopulatingReplQueues() && shouldPopulateReplQueues()) {
         initializeReplQueues();
       }
       long timeInSafemode = now() - startTime;
@@ -3913,7 +3952,8 @@ public class FSNamesystem implements Nam
      * initializing replication queues.
      */
     private synchronized boolean canInitializeReplQueues() {
-      return !isInStandbyState() && blockSafe >= blockReplQueueThreshold;
+      return shouldPopulateReplQueues()
+          && blockSafe >= blockReplQueueThreshold;
     }
       
     /** 
@@ -4253,7 +4293,7 @@ public class FSNamesystem implements Nam
 
   @Override
   public boolean isPopulatingReplQueues() {
-    if (isInStandbyState()) {
+    if (!shouldPopulateReplQueues()) {
       return false;
     }
     // safeMode is volatile, and may be set to null at any time
@@ -4262,7 +4302,13 @@ public class FSNamesystem implements Nam
       return true;
     return safeMode.isPopulatingReplQueues();
   }
-    
+
+  private boolean shouldPopulateReplQueues() {
+    if(haContext == null || haContext.getState() == null)
+      return false;
+    return haContext.getState().shouldPopulateReplQueues();
+  }
+
   @Override
   public void incrementSafeBlockCount(int replication) {
     // safeMode is volatile, and may be set to null at any time
@@ -4880,31 +4926,9 @@ public class FSNamesystem implements Nam
 
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
-  void unprotectedChangeLease(String src, String dst, HdfsFileStatus dinfo) {
-    String overwrite;
-    String replaceBy;
+  void unprotectedChangeLease(String src, String dst) {
     assert hasWriteLock();
-
-    boolean destinationExisted = true;
-    if (dinfo == null) {
-      destinationExisted = false;
-    }
-
-    if (destinationExisted && dinfo.isDir()) {
-      Path spath = new Path(src);
-      Path parent = spath.getParent();
-      if (parent.isRoot()) {
-        overwrite = parent.toString();
-      } else {
-        overwrite = parent.toString() + Path.SEPARATOR;
-      }
-      replaceBy = dst + Path.SEPARATOR;
-    } else {
-      overwrite = src;
-      replaceBy = dst;
-    }
-
-    leaseManager.changeLease(src, dst, overwrite, replaceBy);
+    leaseManager.changeLease(src, dst);
   }
 
   /**
@@ -4915,19 +4939,13 @@ public class FSNamesystem implements Nam
     // lock on our behalf. If we took the read lock here, we could block
     // for fairness if a writer is waiting on the lock.
     synchronized (leaseManager) {
-      out.writeInt(leaseManager.countPath()); // write the size
-
-      for (Lease lease : leaseManager.getSortedLeases()) {
-        for(String path : lease.getPaths()) {
-          // verify that path exists in namespace
-          final INodeFileUnderConstruction cons;
-          try {
-            cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path);
-          } catch (UnresolvedLinkException e) {
-            throw new AssertionError("Lease files should reside on this FS");
-          }
-          FSImageSerialization.writeINodeUnderConstruction(out, cons, path);
-        }
+      Map<String, INodeFileUnderConstruction> nodes =
+          leaseManager.getINodesUnderConstruction();
+      out.writeInt(nodes.size()); // write the size    
+      for (Map.Entry<String, INodeFileUnderConstruction> entry
+           : nodes.entrySet()) {
+        FSImageSerialization.writeINodeUnderConstruction(
+            out, entry.getValue(), entry.getKey());
       }
     }
   }
@@ -5286,7 +5304,7 @@ public class FSNamesystem implements Nam
    * Log fsck event in the audit log 
    */
   void logFsckEvent(String src, InetAddress remoteAddress) throws IOException {
-    if (auditLog.isInfoEnabled()) {
+    if (isAuditEnabled()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     remoteAddress,
                     "fsck", src, null, null);
@@ -5541,4 +5559,44 @@ public class FSNamesystem implements Nam
     return this.blockManager.getDatanodeManager()
         .isAvoidingStaleDataNodesForWrite();
   }
+
+  /**
+   * Default AuditLogger implementation; used when no access logger is
+   * defined in the config file. It can also be explicitly listed in the
+   * config file.
+   */
+  private static class DefaultAuditLogger implements AuditLogger {
+
+    @Override
+    public void initialize(Configuration conf) {
+      // Nothing to do.
+    }
+
+    @Override
+    public void logAuditEvent(boolean succeeded, String userName,
+        InetAddress addr, String cmd, String src, String dst,
+        FileStatus status) {
+      if (auditLog.isInfoEnabled()) {
+        final StringBuilder sb = auditBuffer.get();
+        sb.setLength(0);
+        sb.append("allowed=").append(succeeded).append("\t");
+        sb.append("ugi=").append(userName).append("\t");
+        sb.append("ip=").append(addr).append("\t");
+        sb.append("cmd=").append(cmd).append("\t");
+        sb.append("src=").append(src).append("\t");
+        sb.append("dst=").append(dst).append("\t");
+        if (null == status) {
+          sb.append("perm=null");
+        } else {
+          sb.append("perm=");
+          sb.append(status.getOwner()).append(":");
+          sb.append(status.getGroup()).append(":");
+          sb.append(status.getPermission());
+        }
+        auditLog.info(sb);
+      }
+    }
+
+  }
+
 }

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Fri Dec  7 01:53:35 2012
@@ -73,6 +73,11 @@ class INodeDirectory extends INode {
   INodeDirectory(INodeDirectory other) {
     super(other);
     this.children = other.children;
+    if (this.children != null) {
+      for (INode child : children) {
+        child.parent = this;
+      }
+    }
   }
   
   /** @return true unconditionally. */
@@ -106,6 +111,7 @@ class INodeDirectory extends INode {
 
     final int low = searchChildren(newChild);
     if (low>=0) { // an old child exists so replace by the newChild
+      children.get(low).parent = null;
       children.set(low, newChild);
     } else {
       throw new IllegalArgumentException("No child exists to be replaced");

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Fri Dec  7 01:53:35 2012
@@ -171,4 +171,22 @@ class INodeFileUnderConstruction extends
     setBlock(numBlocks()-1, ucBlock);
     return ucBlock;
   }
+
+  /**
+   * Update the length for the last block
+   * 
+   * @param lastBlockLength
+   *          The length of the last block reported from client
+   * @throws IOException
+   */
+  void updateLengthOfLastBlock(long lastBlockLength) throws IOException {
+    BlockInfo lastBlock = this.getLastBlock();
+    assert (lastBlock != null) : "The last block for path "
+        + this.getFullPathName() + " is null when updating its length";
+    assert (lastBlock instanceof BlockInfoUnderConstruction) : "The last block for path "
+        + this.getFullPathName()
+        + " is not a BlockInfoUnderConstruction when updating its length";
+    lastBlock.setNumBytes(lastBlockLength);
+  }
+  
 }

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Fri Dec  7 01:53:35 2012
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.util.Time.now;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
@@ -39,8 +42,6 @@ import org.apache.hadoop.util.Daemon;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-import static org.apache.hadoop.util.Time.now;
-
 /**
  * LeaseManager does the lease housekeeping for writing on files.   
  * This class also provides useful static methods for lease recovery.
@@ -330,21 +331,19 @@ public class LeaseManager {
     }
   }
 
-  synchronized void changeLease(String src, String dst,
-      String overwrite, String replaceBy) {
+  synchronized void changeLease(String src, String dst) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".changelease: " +
-               " src=" + src + ", dest=" + dst + 
-               ", overwrite=" + overwrite +
-               ", replaceBy=" + replaceBy);
+               " src=" + src + ", dest=" + dst);
     }
 
-    final int len = overwrite.length();
-    for(Map.Entry<String, Lease> entry : findLeaseWithPrefixPath(src, sortedLeasesByPath)) {
+    final int len = src.length();
+    for(Map.Entry<String, Lease> entry
+        : findLeaseWithPrefixPath(src, sortedLeasesByPath).entrySet()) {
       final String oldpath = entry.getKey();
       final Lease lease = entry.getValue();
-      //overwrite must be a prefix of oldpath
-      final String newpath = replaceBy + oldpath.substring(len);
+      // replace stem of src with new destination
+      final String newpath = dst + oldpath.substring(len);
       if (LOG.isDebugEnabled()) {
         LOG.debug("changeLease: replacing " + oldpath + " with " + newpath);
       }
@@ -355,7 +354,8 @@ public class LeaseManager {
   }
 
   synchronized void removeLeaseWithPrefixPath(String prefix) {
-    for(Map.Entry<String, Lease> entry : findLeaseWithPrefixPath(prefix, sortedLeasesByPath)) {
+    for(Map.Entry<String, Lease> entry
+        : findLeaseWithPrefixPath(prefix, sortedLeasesByPath).entrySet()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(LeaseManager.class.getSimpleName()
             + ".removeLeaseWithPrefixPath: entry=" + entry);
@@ -364,13 +364,13 @@ public class LeaseManager {
     }
   }
 
-  static private List<Map.Entry<String, Lease>> findLeaseWithPrefixPath(
+  static private Map<String, Lease> findLeaseWithPrefixPath(
       String prefix, SortedMap<String, Lease> path2lease) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(LeaseManager.class.getSimpleName() + ".findLease: prefix=" + prefix);
     }
 
-    List<Map.Entry<String, Lease>> entries = new ArrayList<Map.Entry<String, Lease>>();
+    final Map<String, Lease> entries = new HashMap<String, Lease>();
     final int srclen = prefix.length();
 
     for(Map.Entry<String, Lease> entry : path2lease.tailMap(prefix).entrySet()) {
@@ -379,7 +379,7 @@ public class LeaseManager {
         return entries;
       }
       if (p.length() == srclen || p.charAt(srclen) == Path.SEPARATOR_CHAR) {
-        entries.add(entry);
+        entries.put(entry.getKey(), entry.getValue());
       }
     }
     return entries;
@@ -426,6 +426,26 @@ public class LeaseManager {
     }
   }
 
+  /**
+   * Get the list of inodes corresponding to valid leases.
+   * @return list of inodes
+   * @throws UnresolvedLinkException
+   */
+  Map<String, INodeFileUnderConstruction> getINodesUnderConstruction() {
+    Map<String, INodeFileUnderConstruction> inodes =
+        new TreeMap<String, INodeFileUnderConstruction>();
+    for (String p : sortedLeasesByPath.keySet()) {
+      // verify that path exists in namespace
+      try {
+        INode node = fsnamesystem.dir.getINode(p);
+        inodes.put(p, INodeFileUnderConstruction.valueOf(node, p));
+      } catch (IOException ioe) {
+        LOG.error(ioe);
+      }
+    }
+    return inodes;
+  }
+  
   /** Check the leases beginning from the oldest.
    *  @return true is sync is needed.
    */

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Dec  7 01:53:35 2012
@@ -598,11 +598,7 @@ public class NameNode {
     String nsId = getNameServiceId(conf);
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
-    if (!haEnabled) {
-      state = ACTIVE_STATE;
-    } else {
-      state = STANDBY_STATE;
-    }
+    state = createHAState();
     this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
     this.haContext = createHAContext();
     try {
@@ -619,6 +615,10 @@ public class NameNode {
     }
   }
 
+  protected HAState createHAState() {
+    return !haEnabled ? ACTIVE_STATE : STANDBY_STATE;
+  }
+
   protected HAContext createHAContext() {
     return new NameNodeHAContext();
   }
@@ -1298,7 +1298,7 @@ public class NameNode {
    *          before exit.
    * @throws ExitException thrown only for testing.
    */
-  private synchronized void doImmediateShutdown(Throwable t)
+  protected synchronized void doImmediateShutdown(Throwable t)
       throws ExitException {
     String message = "Error encountered requiring NN shutdown. " +
         "Shutting down immediately.";

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Dec  7 01:53:35 2012
@@ -825,8 +825,9 @@ class NameNodeRpcServer implements Namen
   }
   
   @Override // ClientProtocol
-  public void fsync(String src, String clientName) throws IOException {
-    namesystem.fsync(src, clientName);
+  public void fsync(String src, String clientName, long lastBlockLength)
+      throws IOException {
+    namesystem.fsync(src, clientName, lastBlockLength);
   }
 
   @Override // ClientProtocol

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Fri Dec  7 01:53:35 2012
@@ -102,7 +102,7 @@ class NamenodeJspHelper {
     long usedNonHeap = (totalNonHeap * 100) / commitedNonHeap;
 
     String str = "<div>" + inodes + " files and directories, " + blocks + " blocks = "
-        + (inodes + blocks) + " total";
+        + (inodes + blocks) + " total filesystem objects";
     if (maxobjects != 0) {
       long pct = ((inodes + blocks) * 100) / maxobjects;
       str += " / " + maxobjects + " (" + pct + "%)";

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java Fri Dec  7 01:53:35 2012
@@ -19,31 +19,26 @@ package org.apache.hadoop.hdfs.server.pr
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class NNHAStatusHeartbeat {
 
-  private State state;
+  private HAServiceState state;
   private long txid = HdfsConstants.INVALID_TXID;
   
-  public NNHAStatusHeartbeat(State state, long txid) {
+  public NNHAStatusHeartbeat(HAServiceState state, long txid) {
     this.state = state;
     this.txid = txid;
   }
 
-  public State getState() {
+  public HAServiceState getState() {
     return state;
   }
   
   public long getTxId() {
     return txid;
   }
-  
-  @InterfaceAudience.Private
-  public enum State {
-    ACTIVE,
-    STANDBY;
-  }
 }

Propchange: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1414747-1418159

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Fri Dec  7 01:53:35 2012
@@ -357,6 +357,7 @@ message SetQuotaResponseProto { // void 
 message FsyncRequestProto {
   required string src = 1;
   required string client = 2;
+  optional sint64 lastBlockLength = 3 [default = -1];
 }
 
 message FsyncResponseProto { // void response

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Dec  7 01:53:35 2012
@@ -1184,4 +1184,17 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.audit.loggers</name>
+  <value>default</value>
+  <description>
+    List of classes implementing audit loggers that will receive audit events.
+    These should be implementations of org.apache.hadoop.hdfs.server.namenode.AuditLogger.
+    The special value "default" can be used to reference the default audit
+    logger, which uses the configured log system. Installing custom audit loggers
+    may affect the performance and stability of the NameNode. Refer to the custom
+    logger's documentation for more details.
+  </description>
+</property>
+
 </configuration>

Propchange: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1414747-1418159

Propchange: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1414747-1418159

Propchange: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1414747-1418159

Propchange: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1414747-1418159

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Dec  7 01:53:35 2012
@@ -321,7 +321,7 @@ public class MiniDFSCluster {
   /**
    * Used by builder to create and return an instance of MiniDFSCluster
    */
-  private MiniDFSCluster(Builder builder) throws IOException {
+  protected MiniDFSCluster(Builder builder) throws IOException {
     if (builder.nnTopology == null) {
       // If no topology is specified, build a single NN. 
       builder.nnTopology = MiniDFSNNTopology.simpleSingleNN(
@@ -369,8 +369,8 @@ public class MiniDFSCluster {
 
   private Configuration conf;
   private NameNodeInfo[] nameNodes;
-  private int numDataNodes;
-  private ArrayList<DataNodeProperties> dataNodes = 
+  protected int numDataNodes;
+  protected List<DataNodeProperties> dataNodes = 
                          new ArrayList<DataNodeProperties>();
   private File base_dir;
   private File data_dir;
@@ -2303,7 +2303,7 @@ public class MiniDFSCluster {
     return port;
   }
   
-  private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
+  protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
                            boolean checkDataNodeAddrConfig) throws IOException {
     if (setupHostsFile) {
       String hostsFile = conf.get(DFS_HOSTS, "").trim();

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java Fri Dec  7 01:53:35 2012
@@ -23,12 +23,14 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.EnumSet;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.log4j.Level;
 import org.junit.Test;
@@ -43,16 +45,21 @@ public class TestHFlush {
   
   private final String fName = "hflushtest.dat";
   
-  /** The test uses {@link #doTheJob(Configuration, String, long, short)
+  /**
+   * The test uses
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 
    * to write a file with a standard block size
    */
   @Test
   public void hFlush_01() throws IOException {
-    doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE, (short)2);
+    doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
+        (short) 2, false, EnumSet.noneOf(SyncFlag.class));
   }
 
-  /** The test uses {@link #doTheJob(Configuration, String, long, short)
-   * to write a file with a custom block size so the writes will be
+  /**
+   * The test uses
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 
+   * to write a file with a custom block size so the writes will be 
    * happening across block' boundaries
    */
   @Test
@@ -64,14 +71,17 @@ public class TestHFlush {
     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
 
-    doTheJob(conf, fName, customBlockSize, (short)2);
+    doTheJob(conf, fName, customBlockSize, (short) 2, false,
+        EnumSet.noneOf(SyncFlag.class));
   }
 
-  /** The test uses {@link #doTheJob(Configuration, String, long, short)
-   * to write a file with a custom block size so the writes will be
+  /**
+   * The test uses
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 
+   * to write a file with a custom block size so the writes will be 
    * happening across block's and checksum' boundaries
    */
- @Test
+  @Test
   public void hFlush_03() throws IOException {
     Configuration conf = new HdfsConfiguration();
     int customPerChecksumSize = 400;
@@ -80,22 +90,106 @@ public class TestHFlush {
     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
 
-    doTheJob(conf, fName, customBlockSize, (short)2);
+    doTheJob(conf, fName, customBlockSize, (short) 2, false,
+        EnumSet.noneOf(SyncFlag.class));
+  }
+
+  /**
+   * Test hsync (with updating block length in NameNode) while no data is
+   * actually written yet
+   */
+  @Test
+  public void hSyncUpdateLength_00() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        2).build();
+    DistributedFileSystem fileSystem =
+        (DistributedFileSystem)cluster.getFileSystem();
+    
+    try {
+      Path path = new Path(fName);
+      FSDataOutputStream stm = fileSystem.create(path, true, 4096, (short) 2,
+          AppendTestUtil.BLOCK_SIZE);
+      System.out.println("Created file " + path.toString());
+      ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+          .of(SyncFlag.UPDATE_LENGTH));
+      long currentFileLength = fileSystem.getFileStatus(path).getLen();
+      assertEquals(0L, currentFileLength);
+      stm.close();
+    } finally {
+      fileSystem.close();
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * The test calls
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+   * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
+   */
+  @Test
+  public void hSyncUpdateLength_01() throws IOException {
+    doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
+        (short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH));
   }
 
   /**
-    The method starts new cluster with defined Configuration;
-    creates a file with specified block_size and writes 10 equal sections in it;
-    it also calls hflush() after each write and throws an IOException in case of 
-    an error.
-    @param conf cluster configuration
-    @param fileName of the file to be created and processed as required
-    @param block_size value to be used for the file's creation
-    @param replicas is the number of replicas
-    @throws IOException in case of any errors 
+   * The test calls
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+   * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
+   * Similar with {@link #hFlush_02()} , it writes a file with a custom block
+   * size so the writes will be happening across block' boundaries
+   */
+  @Test
+  public void hSyncUpdateLength_02() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    // Modify defaul filesystem settings
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+
+    doTheJob(conf, fName, customBlockSize, (short) 2, true,
+        EnumSet.of(SyncFlag.UPDATE_LENGTH));
+  }
+  
+  /**
+   * The test calls
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+   * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
+   * Similar with {@link #hFlush_03()} , it writes a file with a custom block
+   * size so the writes will be happening across block's and checksum'
+   * boundaries.
+   */
+  @Test
+  public void hSyncUpdateLength_03() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    int customPerChecksumSize = 400;
+    int customBlockSize = customPerChecksumSize * 3;
+    // Modify defaul filesystem settings
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+
+    doTheJob(conf, fName, customBlockSize, (short) 2, true,
+        EnumSet.of(SyncFlag.UPDATE_LENGTH));
+  }
+  
+  /**
+   * The method starts new cluster with defined Configuration; creates a file
+   * with specified block_size and writes 10 equal sections in it; it also calls
+   * hflush/hsync after each write and throws an IOException in case of an error.
+   * 
+   * @param conf cluster configuration
+   * @param fileName of the file to be created and processed as required
+   * @param block_size value to be used for the file's creation
+   * @param replicas is the number of replicas
+   * @param isSync hsync or hflush         
+   * @param syncFlags specify the semantic of the sync/flush
+   * @throws IOException in case of any errors
    */
   public static void doTheJob(Configuration conf, final String fileName,
-                              long block_size, short replicas) throws IOException {
+      long block_size, short replicas, boolean isSync,
+      EnumSet<SyncFlag> syncFlags) throws IOException {
     byte[] fileContent;
     final int SECTIONS = 10;
 
@@ -119,8 +213,21 @@ public class TestHFlush {
         System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName);
         // write to the file
         stm.write(fileContent, tenth * i, tenth);
-        // Wait while hflush() pushes all packets through built pipeline
-        ((DFSOutputStream)stm.getWrappedStream()).hflush();
+        
+        // Wait while hflush/hsync pushes all packets through built pipeline
+        if (isSync) {
+          ((DFSOutputStream)stm.getWrappedStream()).hsync(syncFlags);
+        } else {
+          ((DFSOutputStream)stm.getWrappedStream()).hflush();
+        }
+        
+        // Check file length if updatelength is required
+        if (isSync && syncFlags.contains(SyncFlag.UPDATE_LENGTH)) {
+          long currentFileLength = fileSystem.getFileStatus(path).getLen();
+          assertEquals(
+            "File size doesn't match for hsync/hflush with updating the length",
+            tenth * (i + 1), currentFileLength);
+        }
         byte [] toRead = new byte[tenth];
         byte [] expected = new byte[tenth];
         System.arraycopy(fileContent, tenth * i, expected, 0, tenth);
@@ -139,8 +246,6 @@ public class TestHFlush {
 
       assertEquals("File size doesn't match ", AppendTestUtil.FILE_SIZE, fileSystem.getFileStatus(path).getLen());
       AppendTestUtil.checkFullFile(fileSystem, path, fileContent.length, fileContent, "hflush()");
-    } catch (Exception e) {
-      e.printStackTrace();
     } finally {
       fileSystem.close();
       cluster.shutdown();

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java Fri Dec  7 01:53:35 2012
@@ -30,7 +30,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@@ -49,6 +51,10 @@ public class TestLease {
         ).getLeaseByPath(src.toString()) != null;
   }
 
+  static int leaseCount(MiniDFSCluster cluster) {
+    return NameNodeAdapter.getLeaseManager(cluster.getNamesystem()).countLease();
+  }
+  
   static final String dirString = "/test/lease";
   final Path dir = new Path(dirString);
   static final Log LOG = LogFactory.getLog(TestLease.class);
@@ -127,6 +133,96 @@ public class TestLease {
   }
 
   @Test
+  public void testLeaseAfterRename() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    try {
+      Path p = new Path("/test-file");
+      Path d = new Path("/test-d");
+      Path d2 = new Path("/test-d-other");
+
+      // open a file to get a lease
+      FileSystem fs = cluster.getFileSystem();
+      FSDataOutputStream out = fs.create(p);
+      out.writeBytes("something");
+      //out.hsync();
+      Assert.assertTrue(hasLease(cluster, p));
+      Assert.assertEquals(1, leaseCount(cluster));
+      
+      // just to ensure first fs doesn't have any logic to twiddle leases
+      DistributedFileSystem fs2 = (DistributedFileSystem) FileSystem.newInstance(fs.getUri(), fs.getConf());
+
+      // rename the file into an existing dir
+      LOG.info("DMS: rename file into dir");
+      Path pRenamed = new Path(d, p.getName());
+      fs2.mkdirs(d);
+      fs2.rename(p, pRenamed);
+      Assert.assertFalse(p+" exists", fs2.exists(p));
+      Assert.assertTrue(pRenamed+" not found", fs2.exists(pRenamed));
+      Assert.assertFalse("has lease for "+p, hasLease(cluster, p));
+      Assert.assertTrue("no lease for "+pRenamed, hasLease(cluster, pRenamed));
+      Assert.assertEquals(1, leaseCount(cluster));
+    
+      // rename the parent dir to a new non-existent dir
+      LOG.info("DMS: rename parent dir");
+      Path pRenamedAgain = new Path(d2, pRenamed.getName());
+      fs2.rename(d, d2);
+      // src gone
+      Assert.assertFalse(d+" exists", fs2.exists(d));
+      Assert.assertFalse("has lease for "+pRenamed, hasLease(cluster, pRenamed));
+      // dst checks
+      Assert.assertTrue(d2+" not found", fs2.exists(d2));
+      Assert.assertTrue(pRenamedAgain+" not found", fs2.exists(pRenamedAgain));
+      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
+      Assert.assertEquals(1, leaseCount(cluster));
+
+      // rename the parent dir to existing dir
+      // NOTE: rename w/o options moves paths into existing dir
+      LOG.info("DMS: rename parent again");
+      pRenamed = pRenamedAgain;
+      pRenamedAgain = new Path(new Path(d, d2.getName()), p.getName());      
+      fs2.mkdirs(d);
+      fs2.rename(d2, d);
+      // src gone
+      Assert.assertFalse(d2+" exists", fs2.exists(d2));
+      Assert.assertFalse("no lease for "+pRenamed, hasLease(cluster, pRenamed));
+      // dst checks
+      Assert.assertTrue(d+" not found", fs2.exists(d));
+      Assert.assertTrue(pRenamedAgain +" not found", fs2.exists(pRenamedAgain));
+      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
+      Assert.assertEquals(1, leaseCount(cluster));
+      
+      // rename with opts to non-existent dir
+      pRenamed = pRenamedAgain;
+      pRenamedAgain = new Path(d2, p.getName());
+      fs2.rename(pRenamed.getParent(), d2, Options.Rename.OVERWRITE);
+      // src gone
+      Assert.assertFalse(pRenamed.getParent() +" not found", fs2.exists(pRenamed.getParent()));
+      Assert.assertFalse("has lease for "+pRenamed, hasLease(cluster, pRenamed));
+      // dst checks
+      Assert.assertTrue(d2+" not found", fs2.exists(d2));
+      Assert.assertTrue(pRenamedAgain+" not found", fs2.exists(pRenamedAgain));
+      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
+      Assert.assertEquals(1, leaseCount(cluster));
+
+      // rename with opts to existing dir
+      // NOTE: rename with options will not move paths into the existing dir
+      pRenamed = pRenamedAgain;
+      pRenamedAgain = new Path(d, p.getName());
+      fs2.rename(pRenamed.getParent(), d, Options.Rename.OVERWRITE);
+      // src gone
+      Assert.assertFalse(pRenamed.getParent() +" not found", fs2.exists(pRenamed.getParent()));
+      Assert.assertFalse("has lease for "+pRenamed, hasLease(cluster, pRenamed));
+      // dst checks
+      Assert.assertTrue(d+" not found", fs2.exists(d));
+      Assert.assertTrue(pRenamedAgain+" not found", fs2.exists(pRenamedAgain));
+      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
+      Assert.assertEquals(1, leaseCount(cluster));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  @Test
   public void testLease() throws Exception {
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     try {

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java Fri Dec  7 01:53:35 2012
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,6 +46,8 @@ import org.junit.Test;
 public class TestReplicationPolicyWithNodeGroup {
   private static final int BLOCK_SIZE = 1024;
   private static final int NUM_OF_DATANODES = 8;
+  private static final int NUM_OF_DATANODES_BOUNDARY = 6;
+  private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
   private static final Configuration CONF = new HdfsConfiguration();
   private static final NetworkTopology cluster;
   private static final NameNode namenode;
@@ -61,6 +64,32 @@ public class TestReplicationPolicyWithNo
       DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
       DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
   };
+  
+  private final static DatanodeDescriptor dataNodesInBoundaryCase[] = 
+          new DatanodeDescriptor[] {
+      DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r1/n2"),
+      DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
+      DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n3")
+  };
+  
+  private final static DatanodeDescriptor dataNodesInMoreTargetsCase[] =
+          new DatanodeDescriptor[] {
+      DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/r1/n2"),
+      DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/r1/n2"),
+      DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/r1/n3"),
+      DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/r1/n3"),
+      DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/r2/n4"),
+      DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/r2/n4"),
+      DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/r2/n5"),
+      DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/r2/n5"),
+      DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/r2/n6"),
+      DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/r2/n6"),
+  };
 
   private final static DatanodeDescriptor NODE = 
       new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
@@ -74,6 +103,12 @@ public class TestReplicationPolicyWithNo
           "org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup");
       CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, 
           "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+      
+      File baseDir = new File(System.getProperty(
+          "test.build.data", "build/test/data"), "dfs/");
+      CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+          new File(baseDir, "name").getPath());
+      
       DFSTestUtil.formatNameNode(CONF);
       namenode = new NameNode(CONF);
     } catch (IOException e) {
@@ -97,7 +132,27 @@ public class TestReplicationPolicyWithNo
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
     }
   }
-
+  
+  /**
+   * Scan the targets list: all targets should be on different NodeGroups.
+   * Return false if two targets are found on the same NodeGroup.
+   */
+  private static boolean checkTargetsOnDifferentNodeGroup(
+      DatanodeDescriptor[] targets) {
+    if(targets.length == 0)
+      return true;
+    Set<String> targetSet = new HashSet<String>();
+    for(DatanodeDescriptor node:targets) {
+      String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
+      if(targetSet.contains(nodeGroup)) {
+        return false;
+      } else {
+        targetSet.add(nodeGroup);
+      }
+    }
+    return true;
+  }
+  
   /**
    * In this testcase, client is dataNodes[0]. So the 1st replica should be
    * placed on dataNodes[0], the 2nd replica should be placed on 
@@ -497,5 +552,122 @@ public class TestReplicationPolicyWithNo
         null, null, (short)1, first, second);
     assertEquals(chosenNode, dataNodes[5]);
   }
+  
+  /**
+   * Test replica placement policy in case of boundary topology.
+   * Rack 2 has only 1 node group & can't be placed with two replicas
+   * The 1st replica will be placed on writer.
+   * The 2nd replica should be placed on a different rack 
+   * The 3rd replica should be placed on the same rack with writer, but on a 
+   * different node group.
+   */
+  @Test
+  public void testChooseTargetsOnBoundaryTopology() throws Exception {
+    for(int i=0; i<NUM_OF_DATANODES; i++) {
+      cluster.remove(dataNodes[i]);
+    }
+
+    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+      cluster.add(dataNodesInBoundaryCase[i]);
+    }
+    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+        dataNodes[0].updateHeartbeat(
+                2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+                (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+        
+      dataNodesInBoundaryCase[i].updateHeartbeat(
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+    }
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, dataNodesInBoundaryCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+
+    targets = replicator.chooseTarget(filename, 2, dataNodesInBoundaryCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    targets = replicator.chooseTarget(filename, 3, dataNodesInBoundaryCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+  }
+
+  /**
+   * Test re-replication policy in boundary case.
+   * Rack 2 has only one node group & the node in this node group is chosen
+   * Rack 1 has two nodegroups & one of them is chosen.
+   * Replica policy should choose the node from node group of Rack1 but not the
+   * same nodegroup with chosen nodes.
+   */
+  @Test
+  public void testRereplicateOnBoundaryTopology() throws Exception {
+    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+      dataNodesInBoundaryCase[i].updateHeartbeat(
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+    }
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(dataNodesInBoundaryCase[0]);
+    chosenNodes.add(dataNodesInBoundaryCase[5]);
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
+        chosenNodes, BLOCK_SIZE);
+    assertFalse(cluster.isOnSameNodeGroup(targets[0], 
+        dataNodesInBoundaryCase[0]));
+    assertFalse(cluster.isOnSameNodeGroup(targets[0],
+        dataNodesInBoundaryCase[5]));
+    assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+  }
+  
+  /**
+   * Test replica placement policy in case of targets more than number of 
+   * NodeGroups.
+   * The 12-nodes cluster only has 6 NodeGroups, but in some cases, like: 
+   * placing submitted job file, there is requirement to choose more (10) 
+   * targets for placing replica. We should test it can return 6 targets.
+   */
+  @Test
+  public void testChooseMoreTargetsThanNodeGroups() throws Exception {
+    // Cleanup nodes in previous tests
+    for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+      DatanodeDescriptor node = dataNodesInBoundaryCase[i];
+      if (cluster.contains(node)) {
+        cluster.remove(node);
+      }
+    }
+
+    for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+      cluster.add(dataNodesInMoreTargetsCase[i]);
+    }
+
+    for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+      dataNodesInMoreTargetsCase[i].updateHeartbeat(
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+    }
+
+    DatanodeDescriptor[] targets;
+    // Test normal case -- 3 replicas
+    targets = replicator.chooseTarget(filename, 3, dataNodesInMoreTargetsCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+
+    // Test special case -- replica number over node groups.
+    targets = replicator.chooseTarget(filename, 10, dataNodesInMoreTargetsCase[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+    // Verify it only can find 6 targets for placing replicas.
+    assertEquals(targets.length, 6);
+  }
+
 
 }

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java Fri Dec  7 01:53:35 2012
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -41,7 +42,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
-import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -123,7 +123,7 @@ public class TestBPOfferService {
           Mockito.anyInt(),
           Mockito.anyInt(),
           Mockito.anyInt());
-    mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(State.STANDBY, 0);
+    mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     return mock;
   }
   
@@ -255,12 +255,12 @@ public class TestBPOfferService {
       assertNull(bpos.getActiveNN());
 
       // Have NN1 claim active at txid 1
-      mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 1);
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
       bpos.triggerHeartbeatForTests();
       assertSame(mockNN1, bpos.getActiveNN());
 
       // NN2 claims active at a higher txid
-      mockHaStatuses[1] = new NNHAStatusHeartbeat(State.ACTIVE, 2);
+      mockHaStatuses[1] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 2);
       bpos.triggerHeartbeatForTests();
       assertSame(mockNN2, bpos.getActiveNN());
       
@@ -272,12 +272,12 @@ public class TestBPOfferService {
       // Even if NN2 goes to standby, DN shouldn't reset to talking to NN1,
       // because NN1's txid is lower than the last active txid. Instead,
       // it should consider neither active.
-      mockHaStatuses[1] = new NNHAStatusHeartbeat(State.STANDBY, 2);
+      mockHaStatuses[1] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 2);
       bpos.triggerHeartbeatForTests();
       assertNull(bpos.getActiveNN());
       
       // Now if NN1 goes back to a higher txid, it should be considered active
-      mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 3);
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 3);
       bpos.triggerHeartbeatForTests();
       assertSame(mockNN1, bpos.getActiveNN());
 

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java Fri Dec  7 01:53:35 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -72,7 +73,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
-import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -157,7 +157,7 @@ public class TestBlockRecovery {
             Mockito.anyInt()))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
-            new NNHAStatusHeartbeat(State.ACTIVE, 1)));
+            new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1)));
 
     dn = new DataNode(conf, dirs, null) {
       @Override

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Fri Dec  7 01:53:35 2012
@@ -179,6 +179,13 @@ public class NameNodeAdapter {
     return spy;
   }
   
+  public static JournalSet spyOnJournalSet(NameNode nn) {
+    FSEditLog editLog = nn.getFSImage().getEditLog();
+    JournalSet js = Mockito.spy(editLog.getJournalSet());
+    editLog.setJournalSetForTesting(js);
+    return js;
+  }
+  
   public static String getMkdirOpPath(FSEditLogOp op) {
     if (op.opCode == FSEditLogOpCodes.OP_MKDIR) {
       return ((MkdirOp) op).path;

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java Fri Dec  7 01:53:35 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -103,6 +104,9 @@ public class TestBackupNode {
     BackupNode bn = (BackupNode)NameNode.createNameNode(
         new String[]{startupOpt.getName()}, c);
     assertTrue(bn.getRole() + " must be in SafeMode.", bn.isInSafeMode());
+    assertTrue(bn.getRole() + " must be in StandbyState",
+               bn.getNamesystem().getHAState()
+                 .equalsIgnoreCase(HAServiceState.STANDBY.name()));
     return bn;
   }
 

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java Fri Dec  7 01:53:35 2012
@@ -25,10 +25,15 @@ import static org.junit.Assert.fail;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.junit.Test;
 
@@ -157,6 +162,48 @@ public class TestINodeFile {
     
   }
   
+  /**
+   * FSDirectory#unprotectedSetQuota creates a new INodeDirectoryWithQuota to
+   * replace the original INodeDirectory. Before HDFS-4243, the parent field of
+   * all the children INodes of the target INodeDirectory is not changed to
+   * point to the new INodeDirectoryWithQuota. This testcase tests this
+   * scenario.
+   */
+  @Test
+  public void testGetFullPathNameAfterSetQuota() throws Exception {
+    long fileLen = 1024;
+    replication = 3;
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        replication).build();
+    cluster.waitActive();
+    FSNamesystem fsn = cluster.getNamesystem();
+    FSDirectory fsdir = fsn.getFSDirectory();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    
+    // Create a file for test
+    final Path dir = new Path("/dir");
+    final Path file = new Path(dir, "file");
+    DFSTestUtil.createFile(dfs, file, fileLen, replication, 0L);
+    
+    // Check the full path name of the INode associating with the file
+    INode fnode = fsdir.getINode(file.toString());
+    assertEquals(file.toString(), fnode.getFullPathName());
+    
+    // Call FSDirectory#unprotectedSetQuota which calls
+    // INodeDirectory#replaceChild
+    dfs.setQuota(dir, Long.MAX_VALUE - 1, replication * fileLen * 10);
+    final Path newDir = new Path("/newdir");
+    final Path newFile = new Path(newDir, "file");
+    // Also rename dir
+    dfs.rename(dir, newDir, Options.Rename.OVERWRITE);
+    // /dir/file now should be renamed to /newdir/file
+    fnode = fsdir.getINode(newFile.toString());
+    // getFullPathName can return correct result only if the parent field of
+    // child node is set correctly
+    assertEquals(newFile.toString(), fnode.getFullPathName());
+  }
+  
   @Test
   public void testAppendBlocks() {
     INodeFile origFile = createINodeFiles(1, "origfile")[0];

Modified: hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java Fri Dec  7 01:53:35 2012
@@ -92,7 +92,8 @@ public class TestMetaSave {
     DataInputStream in = new DataInputStream(fstream);
     BufferedReader reader = new BufferedReader(new InputStreamReader(in));
     String line = reader.readLine();
-    assertTrue(line.equals("3 files and directories, 2 blocks = 5 total"));
+    assertTrue(line.equals(
+      "3 files and directories, 2 blocks = 5 total filesystem objects"));
     line = reader.readLine();
     assertTrue(line.equals("Live Datanodes: 1"));
     line = reader.readLine();