You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2013/07/30 10:01:01 UTC

svn commit: r1508336 [1/2] - in /hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/m...

Author: suresh
Date: Tue Jul 30 08:01:00 2013
New Revision: 1508336

URL: http://svn.apache.org/r1508336
Log:
HDFS-5025. Merge 1508335 from branch-2

Added:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
      - copied unchanged from r1508335, hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jul 30 08:01:00 2013
@@ -77,6 +77,9 @@ Release 2.1.0-beta - 2013-07-02
     protocol methods. (suresh)
 
     HDFS-4979. Implement retry cache on Namenode. (suresh)
+
+    HDFS-5025. Record ClientId and CallId in EditLog to enable rebuilding 
+    retry cache in case of HA failover. (Jing Zhao via suresh)
     
   IMPROVEMENTS
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jul 30 08:01:00 2013
@@ -449,7 +449,8 @@ public class DFSClient implements java.i
    * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
    * Exactly one of nameNodeUri or rpcNamenode must be null.
    */
-  DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
+  @VisibleForTesting
+  public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
       Configuration conf, FileSystem.Statistics stats)
     throws IOException {
     // Copy only the required DFSClient configuration

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Tue Jul 30 08:01:00 2013
@@ -68,6 +68,7 @@ import org.apache.hadoop.tools.GetUserMa
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -307,7 +308,8 @@ public class NameNodeProxies {
   }
 
   /** Gets the configured Failover proxy provider's class */
-  private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
+  @VisibleForTesting
+  public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
       Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
     if (nameNodeUri == null) {
       return null;
@@ -344,7 +346,8 @@ public class NameNodeProxies {
   }
 
   /** Creates the Failover proxy provider instance*/
-  private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
+  @VisibleForTesting
+  public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
       Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
       Class<T> xface, URI nameNodeUri) throws IOException {
     Preconditions.checkArgument(

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Tue Jul 30 08:01:00 2013
@@ -104,7 +104,9 @@ public class LayoutVersion {
     OPTIMIZE_SNAPSHOT_INODES(-45, -43,
         "Reduce snapshot inode memory footprint", false),
     SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " +
-        "block IDs in the edits log and image files");
+        "block IDs in the edits log and image files"),
+    EDITLOG_SUPPORT_RETRYCACHE(-47, "Record ClientId and CallId in editlog to " 
+        + "enable rebuilding retry cache in case of HA failover");
     
     final int lv;
     final int ancestorLV;

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Jul 30 08:01:00 2013
@@ -381,12 +381,13 @@ public class FSDirectory implements Clos
   /**
    * Persist the block list for the inode.
    */
-  void persistBlocks(String path, INodeFileUnderConstruction file) {
+  void persistBlocks(String path, INodeFileUnderConstruction file,
+      boolean logRetryCache) {
     waitForReady();
 
     writeLock();
     try {
-      fsImage.getEditLog().logUpdateBlocks(path, file);
+      fsImage.getEditLog().logUpdateBlocks(path, file, logRetryCache);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
             +path+" with "+ file.getBlocks().length 
@@ -459,7 +460,7 @@ public class FSDirectory implements Clos
    * @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
    */
   @Deprecated
-  boolean renameTo(String src, String dst) 
+  boolean renameTo(String src, String dst, boolean logRetryCache) 
       throws QuotaExceededException, UnresolvedLinkException, 
       FileAlreadyExistsException, SnapshotAccessControlException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -475,14 +476,15 @@ public class FSDirectory implements Clos
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logRename(src, dst, now);
+    fsImage.getEditLog().logRename(src, dst, now, logRetryCache);
     return true;
   }
 
   /**
    * @see #unprotectedRenameTo(String, String, long, Options.Rename...)
    */
-  void renameTo(String src, String dst, Options.Rename... options)
+  void renameTo(String src, String dst, boolean logRetryCache, 
+      Options.Rename... options)
       throws FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, QuotaExceededException,
       UnresolvedLinkException, IOException {
@@ -500,7 +502,7 @@ public class FSDirectory implements Clos
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logRename(src, dst, now, options);
+    fsImage.getEditLog().logRename(src, dst, now, logRetryCache, options);
   }
 
   /**
@@ -1176,7 +1178,7 @@ public class FSDirectory implements Clos
   /**
    * Concat all the blocks from srcs to trg and delete the srcs files
    */
-  void concat(String target, String [] srcs) 
+  void concat(String target, String [] srcs, boolean supportRetryCache) 
       throws UnresolvedLinkException, QuotaExceededException,
       SnapshotAccessControlException, SnapshotException {
     writeLock();
@@ -1186,7 +1188,8 @@ public class FSDirectory implements Clos
       long timestamp = now();
       unprotectedConcat(target, srcs, timestamp);
       // do the commit
-      fsImage.getEditLog().logConcat(target, srcs, timestamp);
+      fsImage.getEditLog().logConcat(target, srcs, timestamp, 
+          supportRetryCache);
     } finally {
       writeUnlock();
     }
@@ -1261,10 +1264,12 @@ public class FSDirectory implements Clos
    * @param src Path of a directory to delete
    * @param collectedBlocks Blocks under the deleted directory
    * @param removedINodes INodes that should be removed from {@link #inodeMap}
+   * @param logRetryCache Whether to record RPC IDs in editlog to support retry
+   *                      cache rebuilding.
    * @return true on successful deletion; else false
    */
   boolean delete(String src, BlocksMapUpdateInfo collectedBlocks,
-      List<INode> removedINodes) throws IOException {
+      List<INode> removedINodes, boolean logRetryCache) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
     }
@@ -1299,7 +1304,7 @@ public class FSDirectory implements Clos
     if (filesRemoved < 0) {
       return false;
     }
-    fsImage.getEditLog().logDelete(src, now);
+    fsImage.getEditLog().logDelete(src, now, logRetryCache);
     incrDeletedFileCount(filesRemoved);
     // Blocks/INodes will be handled later by the caller of this method
     getFSNamesystem().removePathAndBlocks(src, null, null);
@@ -2523,7 +2528,7 @@ public class FSDirectory implements Clos
   /**
    * Create FileStatus by file INode 
    */
-   private HdfsFileStatus createFileStatus(byte[] path, INode node,
+   HdfsFileStatus createFileStatus(byte[] path, INode node,
        Snapshot snapshot) {
      long size = 0;     // length is zero for directories
      short replication = 0;
@@ -2596,7 +2601,7 @@ public class FSDirectory implements Clos
    * Add the given symbolic link to the fs. Record it in the edits log.
    */
   INodeSymlink addSymlink(String path, String target,
-      PermissionStatus dirPerms, boolean createParent)
+      PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
       throws UnresolvedLinkException, FileAlreadyExistsException,
       QuotaExceededException, SnapshotAccessControlException {
     waitForReady();
@@ -2622,7 +2627,8 @@ public class FSDirectory implements Clos
       NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path);
       return null;
     }
-    fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode);
+    fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode,
+        logRetryCache);
     
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added");

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Jul 30 08:01:00 2013
@@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.conf.Configuration;
 
@@ -662,11 +663,20 @@ public class FSEditLog implements LogsPu
     LOG.info(buf);
   }
 
+  /** Record the RPC IDs if necessary */
+  private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
+    if (toLogRpcIds) {
+      op.setRpcClientId(Server.getClientId());
+      op.setRpcCallId(Server.getCallId());
+    }
+  }
+  
   /** 
    * Add open lease record to edit log. 
    * Records the block locations of the last block.
    */
-  public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
+  public void logOpenFile(String path, INodeFileUnderConstruction newNode,
+      boolean toLogRpcIds) {
     AddOp op = AddOp.getInstance(cache.get())
       .setInodeId(newNode.getId())
       .setPath(path)
@@ -678,8 +688,8 @@ public class FSEditLog implements LogsPu
       .setPermissionStatus(newNode.getPermissionStatus())
       .setClientName(newNode.getClientName())
       .setClientMachine(newNode.getClientMachine());
-    
-      logEdit(op);
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
   }
 
   /** 
@@ -698,10 +708,12 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
   
-  public void logUpdateBlocks(String path, INodeFileUnderConstruction file) {
+  public void logUpdateBlocks(String path, INodeFileUnderConstruction file,
+      boolean toLogRpcIds) {
     UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
       .setPath(path)
       .setBlocks(file.getBlocks());
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
@@ -721,23 +733,26 @@ public class FSEditLog implements LogsPu
    * Add rename record to edit log
    * TODO: use String parameters until just before writing to disk
    */
-  void logRename(String src, String dst, long timestamp) {
+  void logRename(String src, String dst, long timestamp, boolean toLogRpcIds) {
     RenameOldOp op = RenameOldOp.getInstance(cache.get())
       .setSource(src)
       .setDestination(dst)
       .setTimestamp(timestamp);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
   /** 
    * Add rename record to edit log
    */
-  void logRename(String src, String dst, long timestamp, Options.Rename... options) {
+  void logRename(String src, String dst, long timestamp, boolean toLogRpcIds,
+      Options.Rename... options) {
     RenameOp op = RenameOp.getInstance(cache.get())
       .setSource(src)
       .setDestination(dst)
       .setTimestamp(timestamp)
       .setOptions(options);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
@@ -784,21 +799,23 @@ public class FSEditLog implements LogsPu
   /**
    * concat(trg,src..) log
    */
-  void logConcat(String trg, String [] srcs, long timestamp) {
+  void logConcat(String trg, String[] srcs, long timestamp, boolean toLogRpcIds) {
     ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get())
       .setTarget(trg)
       .setSources(srcs)
       .setTimestamp(timestamp);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
   /** 
    * Add delete file record to edit log
    */
-  void logDelete(String src, long timestamp) {
+  void logDelete(String src, long timestamp, boolean toLogRpcIds) {
     DeleteOp op = DeleteOp.getInstance(cache.get())
       .setPath(src)
       .setTimestamp(timestamp);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
 
@@ -843,8 +860,8 @@ public class FSEditLog implements LogsPu
   /** 
    * Add a create symlink record.
    */
-  void logSymlink(String path, String value, long mtime, 
-                  long atime, INodeSymlink node) {
+  void logSymlink(String path, String value, long mtime, long atime,
+      INodeSymlink node, boolean toLogRpcIds) {
     SymlinkOp op = SymlinkOp.getInstance(cache.get())
       .setId(node.getId())
       .setPath(path)
@@ -852,6 +869,7 @@ public class FSEditLog implements LogsPu
       .setModificationTime(mtime)
       .setAccessTime(atime)
       .setPermissionStatus(node.getPermissionStatus());
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
@@ -896,22 +914,26 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
   
-  void logCreateSnapshot(String snapRoot, String snapName) {
+  void logCreateSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
     CreateSnapshotOp op = CreateSnapshotOp.getInstance(cache.get())
         .setSnapshotRoot(snapRoot).setSnapshotName(snapName);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
-  void logDeleteSnapshot(String snapRoot, String snapName) {
+  void logDeleteSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
     DeleteSnapshotOp op = DeleteSnapshotOp.getInstance(cache.get())
         .setSnapshotRoot(snapRoot).setSnapshotName(snapName);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   
-  void logRenameSnapshot(String path, String snapOldName, String snapNewName) {
+  void logRenameSnapshot(String path, String snapOldName, String snapNewName,
+      boolean toLogRpcIds) {
     RenameSnapshotOp op = RenameSnapshotOp.getInstance(cache.get())
         .setSnapshotRoot(path).setSnapshotOldName(snapOldName)
         .setSnapshotNewName(snapNewName);
+    logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
   

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Jul 30 08:01:00 2013
@@ -34,8 +34,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -277,7 +279,8 @@ public class FSEditLogLoader {
     if (LOG.isTraceEnabled()) {
       LOG.trace("replaying edit log: " + op);
     }
-
+    final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds();
+    
     switch (op.opCode) {
     case OP_ADD: {
       AddCloseOp addCloseOp = (AddCloseOp)op;
@@ -300,8 +303,8 @@ public class FSEditLogLoader {
       if (oldFile == null) { // this is OP_ADD on a new file (case 1)
         // versions > 0 support per file replication
         // get name and replication
-        final short replication  = fsNamesys.getBlockManager(
-            ).adjustReplication(addCloseOp.replication);
+        final short replication = fsNamesys.getBlockManager()
+            .adjustReplication(addCloseOp.replication);
         assert addCloseOp.blocks.length == 0;
 
         // add to the file tree
@@ -313,6 +316,13 @@ public class FSEditLogLoader {
             addCloseOp.clientName, addCloseOp.clientMachine);
         fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
 
+        // add the op into retry cache if necessary
+        if (toAddRetryCache) {
+          HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
+              HdfsFileStatus.EMPTY_NAME, newFile, null);
+          fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
+              addCloseOp.rpcCallId, stat);
+        }
       } else { // This is OP_ADD on an existing file
         if (!oldFile.isUnderConstruction()) {
           // This is case 3: a call to append() on an already-closed file.
@@ -320,11 +330,17 @@ public class FSEditLogLoader {
             FSNamesystem.LOG.debug("Reopening an already-closed file " +
                 "for append");
           }
-          fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
-              addCloseOp.clientName, addCloseOp.clientMachine, null,
-              false, iip.getLatestSnapshot());
+          LocatedBlock lb = fsNamesys.prepareFileForWrite(addCloseOp.path,
+              oldFile, addCloseOp.clientName, addCloseOp.clientMachine, null,
+              false, iip.getLatestSnapshot(), false);
           newFile = INodeFile.valueOf(fsDir.getINode(addCloseOp.path),
               addCloseOp.path, true);
+          
+          // add the op into retry cache is necessary
+          if (toAddRetryCache) {
+            fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
+                addCloseOp.rpcCallId, lb);
+          }
         }
       }
       // Fall-through for case 2.
@@ -384,6 +400,10 @@ public class FSEditLogLoader {
           updateOp.path);
       // Update in-memory data structures
       updateBlocks(fsDir, updateOp, oldFile);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
+      }
       break;
     }
       
@@ -399,17 +419,30 @@ public class FSEditLogLoader {
       ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
       fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
           concatDeleteOp.timestamp);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
+            concatDeleteOp.rpcCallId);
+      }
       break;
     }
     case OP_RENAME_OLD: {
       RenameOldOp renameOp = (RenameOldOp)op;
       fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
                                 renameOp.timestamp);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
+      }
       break;
     }
     case OP_DELETE: {
       DeleteOp deleteOp = (DeleteOp)op;
       fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId);
+      }
       break;
     }
     case OP_MKDIR: {
@@ -474,12 +507,20 @@ public class FSEditLogLoader {
       fsDir.unprotectedAddSymlink(inodeId, symlinkOp.path,
                                   symlinkOp.value, symlinkOp.mtime, 
                                   symlinkOp.atime, symlinkOp.permissionStatus);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId);
+      }
       break;
     }
     case OP_RENAME: {
       RenameOp renameOp = (RenameOp)op;
       fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
                                 renameOp.timestamp, renameOp.options);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
+      }
       break;
     }
     case OP_GET_DELEGATION_TOKEN: {
@@ -532,8 +573,12 @@ public class FSEditLogLoader {
     }
     case OP_CREATE_SNAPSHOT: {
       CreateSnapshotOp createSnapshotOp = (CreateSnapshotOp) op;
-      fsNamesys.getSnapshotManager().createSnapshot(
+      String path = fsNamesys.getSnapshotManager().createSnapshot(
           createSnapshotOp.snapshotRoot, createSnapshotOp.snapshotName);
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
+            createSnapshotOp.rpcCallId, path);
+      }
       break;
     }
     case OP_DELETE_SNAPSHOT: {
@@ -547,6 +592,11 @@ public class FSEditLogLoader {
       collectedBlocks.clear();
       fsNamesys.dir.removeFromInodeMap(removedINodes);
       removedINodes.clear();
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(deleteSnapshotOp.rpcClientId,
+            deleteSnapshotOp.rpcCallId);
+      }
       break;
     }
     case OP_RENAME_SNAPSHOT: {
@@ -554,6 +604,11 @@ public class FSEditLogLoader {
       fsNamesys.getSnapshotManager().renameSnapshot(
           renameSnapshotOp.snapshotRoot, renameSnapshotOp.snapshotOldName,
           renameSnapshotOp.snapshotNewName);
+      
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(renameSnapshotOp.rpcClientId,
+            renameSnapshotOp.rpcCallId);
+      }
       break;
     }
     case OP_ALLOW_SNAPSHOT: {

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Jul 30 08:01:00 2013
@@ -17,55 +17,88 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.util.zip.CheckedInputStream;
-import java.util.zip.Checksum;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLEAR_NS_QUOTA;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLOSE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_DELETE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PERMISSIONS;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.EnumMap;
 import java.util.List;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
 
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.util.PureJavaCrc32;
-
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.util.XMLUtils;
+import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
+import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
+import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.hdfs.util.XMLUtils;
-import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
-import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RpcConstants;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.PureJavaCrc32;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 import org.xml.sax.helpers.AttributesImpl;
 
 import com.google.common.base.Preconditions;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.EOFException;
-
 /**
  * Helper classes for reading the ops from an InputStream.
  * All ops derive from FSEditLogOp and are only
@@ -76,6 +109,8 @@ import java.io.EOFException;
 public abstract class FSEditLogOp {
   public final FSEditLogOpCodes opCode;
   long txid;
+  byte[] rpcClientId = RpcConstants.DUMMY_CLIENT_ID;
+  int rpcCallId = RpcConstants.INVALID_CALL_ID;
 
   @SuppressWarnings("deprecation")
   final public static class OpInstanceCache {
@@ -150,6 +185,31 @@ public abstract class FSEditLogOp {
   public void setTransactionId(long txid) {
     this.txid = txid;
   }
+  
+  public boolean hasRpcIds() {
+    return rpcClientId != RpcConstants.DUMMY_CLIENT_ID
+        && rpcCallId != RpcConstants.INVALID_CALL_ID;
+  }
+  
+  /** this has to be called after calling {@link #hasRpcIds()} */
+  public byte[] getClientId() {
+    Preconditions.checkState(rpcClientId != RpcConstants.DUMMY_CLIENT_ID);
+    return rpcClientId;
+  }
+  
+  public void setRpcClientId(byte[] clientId) {
+    this.rpcClientId = clientId;
+  }
+  
+  /** this has to be called after calling {@link #hasRpcIds()} */
+  public int getCallId() {
+    Preconditions.checkState(rpcCallId != RpcConstants.INVALID_CALL_ID);
+    return rpcCallId;
+  }
+  
+  public void setRpcCallId(int callId) {
+    this.rpcCallId = callId;
+  }
 
   abstract void readFields(DataInputStream in, int logVersion)
       throws IOException;
@@ -163,6 +223,46 @@ public abstract class FSEditLogOp {
     boolean shouldCompleteLastBlock();
   }
   
+  private static void writeRpcIds(final byte[] clientId, final int callId,
+      DataOutputStream out) throws IOException {
+    FSImageSerialization.writeBytes(clientId, out);
+    FSImageSerialization.writeInt(callId, out);
+  }
+  
+  void readRpcIds(DataInputStream in, int logVersion)
+      throws IOException {
+    if (LayoutVersion.supports(Feature.EDITLOG_SUPPORT_RETRYCACHE,
+        logVersion)) {
+      this.rpcClientId = FSImageSerialization.readBytes(in);
+      this.rpcCallId = FSImageSerialization.readInt(in);
+    }
+  }
+  
+  void readRpcIdsFromXml(Stanza st) {
+    this.rpcClientId = st.hasChildren("RPC_CLIENTID") ? 
+        ClientId.toBytes(st.getValue("RPC_CLIENTID"))
+        : RpcConstants.DUMMY_CLIENT_ID;
+    this.rpcCallId = st.hasChildren("RPC_CALLID") ? 
+        Integer.valueOf(st.getValue("RPC_CALLID"))
+        : RpcConstants.INVALID_CALL_ID;
+  }
+  
+  private static void appendRpcIdsToString(final StringBuilder builder,
+      final byte[] clientId, final int callId) {
+    builder.append(", RpcClientId=");
+    builder.append(ClientId.toString(clientId));
+    builder.append(", RpcCallId=");
+    builder.append(callId);
+  }
+  
+  private static void appendRpcIdsToXml(ContentHandler contentHandler,
+      final byte[] clientId, final int callId) throws SAXException {
+    XMLUtils.addSaxString(contentHandler, "RPC_CLIENTID",
+        ClientId.toString(clientId));
+    XMLUtils.addSaxString(contentHandler, "RPC_CALLID", 
+        Integer.valueOf(callId).toString());
+  }
+  
   @SuppressWarnings("unchecked")
   static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
     int length;
@@ -176,7 +276,7 @@ public abstract class FSEditLogOp {
     PermissionStatus permissions;
     String clientName;
     String clientMachine;
-
+    
     private AddCloseOp(FSEditLogOpCodes opCode) {
       super(opCode);
       assert(opCode == OP_ADD || opCode == OP_CLOSE);
@@ -247,8 +347,7 @@ public abstract class FSEditLogOp {
     }
 
     @Override
-    public 
-    void writeFields(DataOutputStream out) throws IOException {
+    public void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeLong(inodeId, out);
       FSImageSerialization.writeString(path, out);
       FSImageSerialization.writeShort(replication, out);
@@ -261,6 +360,8 @@ public abstract class FSEditLogOp {
       if (this.opCode == OP_ADD) {
         FSImageSerialization.writeString(clientName,out);
         FSImageSerialization.writeString(clientMachine,out);
+        // write clientId and callId
+        writeRpcIds(rpcClientId, rpcCallId, out);
       }
     }
 
@@ -317,6 +418,8 @@ public abstract class FSEditLogOp {
       if (this.opCode == OP_ADD) {
         this.clientName = FSImageSerialization.readString(in);
         this.clientMachine = FSImageSerialization.readString(in);
+        // read clientId and callId
+        readRpcIds(in, logVersion);
       } else {
         this.clientName = "";
         this.clientMachine = "";
@@ -368,6 +471,9 @@ public abstract class FSEditLogOp {
       builder.append(clientName);
       builder.append(", clientMachine=");
       builder.append(clientMachine);
+      if (this.opCode == OP_ADD) {
+        appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+      }
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -397,9 +503,13 @@ public abstract class FSEditLogOp {
         FSEditLogOp.blockToXml(contentHandler, b);
       }
       FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
+      if (this.opCode == OP_ADD) {
+        appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+      }
     }
 
-    @Override void fromXml(Stanza st) throws InvalidXmlException {
+    @Override 
+    void fromXml(Stanza st) throws InvalidXmlException {
       this.length = Integer.valueOf(st.getValue("LENGTH"));
       this.inodeId = Long.valueOf(st.getValue("INODEID"));
       this.path = st.getValue("PATH");
@@ -420,9 +530,14 @@ public abstract class FSEditLogOp {
       }
       this.permissions =
           permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+      readRpcIdsFromXml(st);
     }
   }
 
+  /**
+   * {@literal @AtMostOnce} for {@link ClientProtocol#startFile} and
+   * {@link ClientProtocol#appendFile}
+   */
   static class AddOp extends AddCloseOp {
     private AddOp() {
       super(OP_ADD);
@@ -446,6 +561,11 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /**
+   * Although {@link ClientProtocol#appendFile} may also log a close op, we do
+   * not need to record the rpc ids here since a successful appendFile op will
+   * finally log an AddOp.
+   */
   static class CloseOp extends AddCloseOp {
     private CloseOp() {
       super(OP_CLOSE);
@@ -469,6 +589,10 @@ public abstract class FSEditLogOp {
     }
   }
   
+  /**
+   * {@literal @AtMostOnce} for {@link ClientProtocol#updatePipeline}, but 
+   * {@literal @Idempotent} for some other ops.
+   */
   static class UpdateBlocksOp extends FSEditLogOp implements BlockListUpdatingOp {
     String path;
     Block[] blocks;
@@ -481,7 +605,6 @@ public abstract class FSEditLogOp {
       return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
     }
     
-    
     UpdateBlocksOp setPath(String path) {
       this.path = path;
       return this;
@@ -507,6 +630,8 @@ public abstract class FSEditLogOp {
     void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeString(path, out);
       FSImageSerialization.writeCompactBlockArray(blocks, out);
+      // clientId and callId
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
     
     @Override
@@ -514,6 +639,7 @@ public abstract class FSEditLogOp {
       path = FSImageSerialization.readString(in);
       this.blocks = FSImageSerialization.readCompactBlockArray(
           in, logVersion);
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -527,8 +653,9 @@ public abstract class FSEditLogOp {
       sb.append("UpdateBlocksOp [path=")
         .append(path)
         .append(", blocks=")
-        .append(Arrays.toString(blocks))
-        .append("]");
+        .append(Arrays.toString(blocks));
+      appendRpcIdsToString(sb, rpcClientId, rpcCallId);
+      sb.append("]");
       return sb.toString();
     }
     
@@ -538,6 +665,7 @@ public abstract class FSEditLogOp {
       for (Block b : blocks) {
         FSEditLogOp.blockToXml(contentHandler, b);
       }
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
     
     @Override void fromXml(Stanza st) throws InvalidXmlException {
@@ -547,9 +675,11 @@ public abstract class FSEditLogOp {
       for (int i = 0; i < blocks.size(); i++) {
         this.blocks[i] = FSEditLogOp.blockFromXml(blocks.get(i));
       }
+      readRpcIdsFromXml(st);
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setReplication} */
   static class SetReplicationOp extends FSEditLogOp {
     String path;
     short replication;
@@ -618,6 +748,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#concat} */
   static class ConcatDeleteOp extends FSEditLogOp {
     int length;
     String trg;
@@ -654,8 +785,7 @@ public abstract class FSEditLogOp {
     }
 
     @Override
-    public 
-    void writeFields(DataOutputStream out) throws IOException {
+    public void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeString(trg, out);
             
       DeprecatedUTF8 info[] = new DeprecatedUTF8[srcs.length];
@@ -666,6 +796,9 @@ public abstract class FSEditLogOp {
       new ArrayWritable(DeprecatedUTF8.class, info).write(out);
 
       FSImageSerialization.writeLong(timestamp, out);
+      
+      // rpc ids
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -704,6 +837,8 @@ public abstract class FSEditLogOp {
       } else {
         this.timestamp = readLong(in);
       }
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -717,6 +852,7 @@ public abstract class FSEditLogOp {
       builder.append(Arrays.toString(srcs));
       builder.append(", timestamp=");
       builder.append(timestamp);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -738,6 +874,7 @@ public abstract class FSEditLogOp {
             "SOURCE" + (i + 1), srcs[i]);
       }
       contentHandler.endElement("", "", "SOURCES");
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
     
     @Override void fromXml(Stanza st) throws InvalidXmlException {
@@ -755,9 +892,11 @@ public abstract class FSEditLogOp {
       for (i = 0; i < srcs.length; i++) {
         srcs[i] = sources.get(0).getValue("SOURCE" + (i + 1));
       }
+      readRpcIdsFromXml(st);
     }
   }
 
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#rename} */
   static class RenameOldOp extends FSEditLogOp {
     int length;
     String src;
@@ -793,6 +932,7 @@ public abstract class FSEditLogOp {
       FSImageSerialization.writeString(src, out);
       FSImageSerialization.writeString(dst, out);
       FSImageSerialization.writeLong(timestamp, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -812,6 +952,9 @@ public abstract class FSEditLogOp {
       } else {
         this.timestamp = readLong(in);
       }
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -825,6 +968,7 @@ public abstract class FSEditLogOp {
       builder.append(dst);
       builder.append(", timestamp=");
       builder.append(timestamp);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -841,16 +985,21 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "DST", dst);
       XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
           Long.valueOf(timestamp).toString());
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
     
-    @Override void fromXml(Stanza st) throws InvalidXmlException {
+    @Override 
+    void fromXml(Stanza st) throws InvalidXmlException {
       this.length = Integer.valueOf(st.getValue("LENGTH"));
       this.src = st.getValue("SRC");
       this.dst = st.getValue("DST");
       this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
+      
+      readRpcIdsFromXml(st);
     }
   }
 
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#delete} */
   static class DeleteOp extends FSEditLogOp {
     int length;
     String path;
@@ -879,6 +1028,7 @@ public abstract class FSEditLogOp {
     void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeString(path, out);
       FSImageSerialization.writeLong(timestamp, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -896,6 +1046,8 @@ public abstract class FSEditLogOp {
       } else {
         this.timestamp = readLong(in);
       }
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -907,6 +1059,7 @@ public abstract class FSEditLogOp {
       builder.append(path);
       builder.append(", timestamp=");
       builder.append(timestamp);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -922,15 +1075,19 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "PATH", path);
       XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
           Long.valueOf(timestamp).toString());
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
     
     @Override void fromXml(Stanza st) throws InvalidXmlException {
       this.length = Integer.valueOf(st.getValue("LENGTH"));
       this.path = st.getValue("PATH");
       this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
+      
+      readRpcIdsFromXml(st);
     }
   }
-    
+
+  /** {@literal @Idempotent} for {@link ClientProtocol#mkdirs} */
   static class MkdirOp extends FSEditLogOp {
     int length;
     long inodeId;
@@ -1056,6 +1213,13 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /**
+   * The corresponding operations are either {@literal @Idempotent} (
+   * {@link ClientProtocol#updateBlockForPipeline},
+   * {@link ClientProtocol#recoverLease}, {@link ClientProtocol#addBlock}) or
+   * already bound with other editlog op which records rpc ids (
+   * {@link ClientProtocol#startFile}). Thus no need to record rpc ids here.
+   */
   static class SetGenstampV1Op extends FSEditLogOp {
     long genStampV1;
 
@@ -1108,6 +1272,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** Similar with {@link SetGenstampV1Op} */
   static class SetGenstampV2Op extends FSEditLogOp {
     long genStampV2;
 
@@ -1160,6 +1325,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#addBlock} */
   static class AllocateBlockIdOp extends FSEditLogOp {
     long blockId;
 
@@ -1212,6 +1378,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setPermission} */
   static class SetPermissionsOp extends FSEditLogOp {
     String src;
     FsPermission permissions;
@@ -1277,6 +1444,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setOwner} */
   static class SetOwnerOp extends FSEditLogOp {
     String src;
     String username;
@@ -1357,7 +1525,7 @@ public abstract class FSEditLogOp {
           st.getValue("GROUPNAME") : null;
     }
   }
-
+  
   static class SetNSQuotaOp extends FSEditLogOp {
     String src;
     long nsQuota;
@@ -1457,6 +1625,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setQuota} */
   static class SetQuotaOp extends FSEditLogOp {
     String src;
     long nsQuota;
@@ -1534,6 +1703,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setTimes} */
   static class TimesOp extends FSEditLogOp {
     int length;
     String path;
@@ -1629,6 +1799,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#createSymlink} */
   static class SymlinkOp extends FSEditLogOp {
     int length;
     long inodeId;
@@ -1677,14 +1848,14 @@ public abstract class FSEditLogOp {
     }
 
     @Override
-    public 
-    void writeFields(DataOutputStream out) throws IOException {
+    public void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeLong(inodeId, out);      
       FSImageSerialization.writeString(path, out);
       FSImageSerialization.writeString(value, out);
       FSImageSerialization.writeLong(mtime, out);
       FSImageSerialization.writeLong(atime, out);
       permissionStatus.write(out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -1714,6 +1885,9 @@ public abstract class FSEditLogOp {
         this.atime = readLong(in);
       }
       this.permissionStatus = PermissionStatus.read(in);
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -1733,6 +1907,7 @@ public abstract class FSEditLogOp {
       builder.append(atime);
       builder.append(", permissionStatus=");
       builder.append(permissionStatus);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -1754,9 +1929,11 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "ATIME",
           Long.valueOf(atime).toString());
       FSEditLogOp.permissionStatusToXml(contentHandler, permissionStatus);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
-    @Override void fromXml(Stanza st) throws InvalidXmlException {
+    @Override 
+    void fromXml(Stanza st) throws InvalidXmlException {
       this.length = Integer.valueOf(st.getValue("LENGTH"));
       this.inodeId = Long.valueOf(st.getValue("INODEID"));
       this.path = st.getValue("PATH");
@@ -1765,9 +1942,12 @@ public abstract class FSEditLogOp {
       this.atime = Long.valueOf(st.getValue("ATIME"));
       this.permissionStatus =
           permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+      
+      readRpcIdsFromXml(st);
     }
   }
 
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#rename2} */
   static class RenameOp extends FSEditLogOp {
     int length;
     String src;
@@ -1810,6 +1990,7 @@ public abstract class FSEditLogOp {
       FSImageSerialization.writeString(dst, out);
       FSImageSerialization.writeLong(timestamp, out);
       toBytesWritable(options).write(out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -1830,6 +2011,9 @@ public abstract class FSEditLogOp {
         this.timestamp = readLong(in);
       }
       this.options = readRenameOptions(in);
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     private static Rename[] readRenameOptions(DataInputStream in) throws IOException {
@@ -1866,6 +2050,7 @@ public abstract class FSEditLogOp {
       builder.append(timestamp);
       builder.append(", options=");
       builder.append(Arrays.toString(options));
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append(", opCode=");
       builder.append(opCode);
       builder.append(", txid=");
@@ -1889,6 +2074,7 @@ public abstract class FSEditLogOp {
         prefix = "|";
       }
       XMLUtils.addSaxString(contentHandler, "OPTIONS", bld.toString());
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
     
     @Override void fromXml(Stanza st) throws InvalidXmlException {
@@ -1910,9 +2096,15 @@ public abstract class FSEditLogOp {
           }
         }
       }
+      readRpcIdsFromXml(st);
     }
   }
-
+ 
+  /**
+   * {@literal @Idempotent} for {@link ClientProtocol#recoverLease}. In the
+   * meanwhile, startFile and appendFile both have their own corresponding
+   * editlog op.
+   */
   static class ReassignLeaseOp extends FSEditLogOp {
     String leaseHolder;
     String path;
@@ -1988,6 +2180,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#getDelegationToken} */
   static class GetDelegationTokenOp extends FSEditLogOp {
     DelegationTokenIdentifier token;
     long expiryTime;
@@ -2059,6 +2252,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#renewDelegationToken} */
   static class RenewDelegationTokenOp extends FSEditLogOp {
     DelegationTokenIdentifier token;
     long expiryTime;
@@ -2130,6 +2324,7 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#cancelDelegationToken} */
   static class CancelDelegationTokenOp extends FSEditLogOp {
     DelegationTokenIdentifier token;
 
@@ -2323,7 +2518,8 @@ public abstract class FSEditLogOp {
   }
 
   /**
-   * Operation corresponding to creating a snapshot
+   * Operation corresponding to creating a snapshot.
+   * {@literal @AtMostOnce} for {@link ClientProtocol#createSnapshot}.
    */
   static class CreateSnapshotOp extends FSEditLogOp {
     String snapshotRoot;
@@ -2351,24 +2547,31 @@ public abstract class FSEditLogOp {
     void readFields(DataInputStream in, int logVersion) throws IOException {
       snapshotRoot = FSImageSerialization.readString(in);
       snapshotName = FSImageSerialization.readString(in);
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
     public void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeString(snapshotRoot, out);
       FSImageSerialization.writeString(snapshotName, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
     protected void toXml(ContentHandler contentHandler) throws SAXException {
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
     @Override
     void fromXml(Stanza st) throws InvalidXmlException {
       snapshotRoot = st.getValue("SNAPSHOTROOT");
       snapshotName = st.getValue("SNAPSHOTNAME");
+      
+      readRpcIdsFromXml(st);
     }
     
     @Override
@@ -2378,13 +2581,15 @@ public abstract class FSEditLogOp {
       builder.append(snapshotRoot);
       builder.append(", snapshotName=");
       builder.append(snapshotName);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");
       return builder.toString();
     }
   }
   
   /**
-   * Operation corresponding to delete a snapshot
+   * Operation corresponding to delete a snapshot.
+   * {@literal @AtMostOnce} for {@link ClientProtocol#deleteSnapshot}.
    */
   static class DeleteSnapshotOp extends FSEditLogOp {
     String snapshotRoot;
@@ -2412,24 +2617,31 @@ public abstract class FSEditLogOp {
     void readFields(DataInputStream in, int logVersion) throws IOException {
       snapshotRoot = FSImageSerialization.readString(in);
       snapshotName = FSImageSerialization.readString(in);
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
     public void writeFields(DataOutputStream out) throws IOException {
       FSImageSerialization.writeString(snapshotRoot, out);
       FSImageSerialization.writeString(snapshotName, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
     protected void toXml(ContentHandler contentHandler) throws SAXException {
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
     @Override
     void fromXml(Stanza st) throws InvalidXmlException {
       snapshotRoot = st.getValue("SNAPSHOTROOT");
       snapshotName = st.getValue("SNAPSHOTNAME");
+      
+      readRpcIdsFromXml(st);
     }
     
     @Override
@@ -2439,13 +2651,15 @@ public abstract class FSEditLogOp {
       builder.append(snapshotRoot);
       builder.append(", snapshotName=");
       builder.append(snapshotName);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");
       return builder.toString();
     }
   }
   
   /**
-   * Operation corresponding to rename a snapshot
+   * Operation corresponding to rename a snapshot.
+   * {@literal @AtMostOnce} for {@link ClientProtocol#renameSnapshot}.
    */
   static class RenameSnapshotOp extends FSEditLogOp {
     String snapshotRoot;
@@ -2480,6 +2694,9 @@ public abstract class FSEditLogOp {
       snapshotRoot = FSImageSerialization.readString(in);
       snapshotOldName = FSImageSerialization.readString(in);
       snapshotNewName = FSImageSerialization.readString(in);
+      
+      // read RPC ids if necessary
+      readRpcIds(in, logVersion);
     }
 
     @Override
@@ -2487,6 +2704,8 @@ public abstract class FSEditLogOp {
       FSImageSerialization.writeString(snapshotRoot, out);
       FSImageSerialization.writeString(snapshotOldName, out);
       FSImageSerialization.writeString(snapshotNewName, out);
+      
+      writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
     @Override
@@ -2494,6 +2713,7 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTOLDNAME", snapshotOldName);
       XMLUtils.addSaxString(contentHandler, "SNAPSHOTNEWNAME", snapshotNewName);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
     @Override
@@ -2501,6 +2721,8 @@ public abstract class FSEditLogOp {
       snapshotRoot = st.getValue("SNAPSHOTROOT");
       snapshotOldName = st.getValue("SNAPSHOTOLDNAME");
       snapshotNewName = st.getValue("SNAPSHOTNEWNAME");
+      
+      readRpcIdsFromXml(st);
     }
     
     @Override
@@ -2512,6 +2734,7 @@ public abstract class FSEditLogOp {
       builder.append(snapshotOldName);
       builder.append(", snapshotNewName=");
       builder.append(snapshotNewName);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");
       return builder.toString();
     }
@@ -2520,7 +2743,7 @@ public abstract class FSEditLogOp {
   /**
    * Operation corresponding to allow creating snapshot on a directory
    */
-  static class AllowSnapshotOp extends FSEditLogOp {
+  static class AllowSnapshotOp extends FSEditLogOp { // @Idempotent
     String snapshotRoot;
 
     public AllowSnapshotOp() {
@@ -2574,7 +2797,7 @@ public abstract class FSEditLogOp {
   /**
    * Operation corresponding to disallow creating snapshot on a directory
    */
-  static class DisallowSnapshotOp extends FSEditLogOp {
+  static class DisallowSnapshotOp extends FSEditLogOp { // @Idempotent
     String snapshotRoot;
 
     public DisallowSnapshotOp() {

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Tue Jul 30 08:01:00 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
@@ -80,6 +81,7 @@ public class FSImageSerialization {
   static private final class TLData {
     final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
     final ShortWritable U_SHORT = new ShortWritable();
+    final IntWritable U_INT = new IntWritable();
     final LongWritable U_LONG = new LongWritable();
     final FsPermission FILE_PERM = new FsPermission((short) 0);
   }
@@ -350,9 +352,9 @@ public class FSImageSerialization {
   
   /** read the long value */
   static long readLong(DataInput in) throws IOException {
-    LongWritable ustr = TL_DATA.get().U_LONG;
-    ustr.readFields(in);
-    return ustr.get();
+    LongWritable uLong = TL_DATA.get().U_LONG;
+    uLong.readFields(in);
+    return uLong.get();
   }
 
   /** write the long value */
@@ -361,6 +363,20 @@ public class FSImageSerialization {
     uLong.set(value);
     uLong.write(out);
   }
+  
+  /** read the int value */
+  static int readInt(DataInput in) throws IOException {
+    IntWritable uInt = TL_DATA.get().U_INT;
+    uInt.readFields(in);
+    return uInt.get();
+  }
+  
+  /** write the int value */
+  static void writeInt(int value, DataOutputStream out) throws IOException {
+    IntWritable uInt = TL_DATA.get().U_INT;
+    uInt.set(value);
+    uInt.write(out);
+  }
 
   /** read short value */
   static short readShort(DataInput in) throws IOException {
@@ -414,8 +430,13 @@ public class FSImageSerialization {
   private static void writeLocalName(INodeAttributes inode, DataOutput out)
       throws IOException {
     final byte[] name = inode.getLocalNameBytes();
-    out.writeShort(name.length);
-    out.write(name);
+    writeBytes(name, out);
+  }
+  
+  public static void writeBytes(byte[] data, DataOutput out)
+      throws IOException {
+    out.writeShort(data.length);
+    out.write(data);
   }
 
   /**

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jul 30 08:01:00 2013
@@ -547,7 +547,7 @@ public class FSNamesystem implements Nam
     FSImage fsImage = new FSImage(conf,
         FSNamesystem.getNamespaceDirs(conf),
         FSNamesystem.getNamespaceEditsDirs(conf));
-    FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
+    FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
     StartupOption startOpt = NameNode.getStartupOption(conf);
     if (startOpt == StartupOption.RECOVER) {
       namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
@@ -565,7 +565,11 @@ public class FSNamesystem implements Nam
     }
     return namesystem;
   }
-
+  
+  FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
+    this(conf, fsImage, false);
+  }
+  
   /**
    * Create an FSNamesystem associated with the specified image.
    * 
@@ -574,9 +578,12 @@ public class FSNamesystem implements Nam
    *
    * @param conf configuration
    * @param fsImage The FSImage to associate with
+   * @param ignoreRetryCache Whether or not should ignore the retry cache setup
+   *                         step. For Secondary NN this should be set to true.
    * @throws IOException on bad configuration
    */
-  FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
+  FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
+      throws IOException {
     try {
       resourceRecheckInterval = conf.getLong(
           DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
@@ -667,7 +674,7 @@ public class FSNamesystem implements Nam
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
         auditLoggers.get(0) instanceof DefaultAuditLogger;
-      this.retryCache = initRetryCache(conf);
+      this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -680,6 +687,28 @@ public class FSNamesystem implements Nam
   }
   
   @VisibleForTesting
+  public RetryCache getRetryCache() {
+    return retryCache;
+  }
+  
+  /** Whether or not retry cache is enabled */
+  boolean hasRetryCache() {
+    return retryCache != null;
+  }
+  
+  void addCacheEntryWithPayload(byte[] clientId, int callId, Object payload) {
+    if (retryCache != null) {
+      retryCache.addCacheEntryWithPayload(clientId, callId, payload);
+    }
+  }
+  
+  void addCacheEntry(byte[] clientId, int callId) {
+    if (retryCache != null) {
+      retryCache.addCacheEntry(clientId, callId);
+    }
+  }
+  
+  @VisibleForTesting
   static RetryCache initRetryCache(Configuration conf) {
     boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
         DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
@@ -1518,7 +1547,7 @@ public class FSNamesystem implements Nam
     
     boolean success = false;
     try {
-      concatInt(target, srcs);
+      concatInt(target, srcs, cacheEntry != null);
       success = true;
     } catch (AccessControlException e) {
       logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
@@ -1528,8 +1557,8 @@ public class FSNamesystem implements Nam
     }
   }
 
-  private void concatInt(String target, String [] srcs) 
-      throws IOException, UnresolvedLinkException {
+  private void concatInt(String target, String [] srcs, 
+      boolean logRetryCache) throws IOException, UnresolvedLinkException {
     // verify args
     if(target.isEmpty()) {
       throw new IllegalArgumentException("Target file name is empty");
@@ -1558,7 +1587,7 @@ public class FSNamesystem implements Nam
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot concat " + target, safeMode);
       }
-      concatInternal(pc, target, srcs);
+      concatInternal(pc, target, srcs, logRetryCache);
       resultingStat = getAuditFileInfo(target, false);
     } finally {
       writeUnlock();
@@ -1568,8 +1597,9 @@ public class FSNamesystem implements Nam
   }
 
   /** See {@link #concat(String, String[])} */
-  private void concatInternal(FSPermissionChecker pc, String target, String [] srcs) 
-      throws IOException, UnresolvedLinkException {
+  private void concatInternal(FSPermissionChecker pc, String target,
+      String[] srcs, boolean logRetryCache) throws IOException,
+      UnresolvedLinkException {
     assert hasWriteLock();
 
     // write permission for the target
@@ -1673,7 +1703,7 @@ public class FSNamesystem implements Nam
           Arrays.toString(srcs) + " to " + target);
     }
 
-    dir.concat(target,srcs);
+    dir.concat(target,srcs, logRetryCache);
   }
   
   /**
@@ -1745,7 +1775,7 @@ public class FSNamesystem implements Nam
     }
     boolean success = false;
     try {
-      createSymlinkInt(target, link, dirPerms, createParent);
+      createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
       success = true;
     } catch (AccessControlException e) {
       logAuditEvent(false, "createSymlink", link, target, null);
@@ -1756,7 +1786,7 @@ public class FSNamesystem implements Nam
   }
 
   private void createSymlinkInt(String target, String link,
-      PermissionStatus dirPerms, boolean createParent) 
+      PermissionStatus dirPerms, boolean createParent, boolean logRetryCache) 
       throws IOException, UnresolvedLinkException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
@@ -1787,7 +1817,7 @@ public class FSNamesystem implements Nam
       checkFsObjectLimit();
 
       // add symbolic link to namespace
-      dir.addSymlink(link, target, dirPerms, createParent);
+      dir.addSymlink(link, target, dirPerms, createParent, logRetryCache);
       resultingStat = getAuditFileInfo(link, false);
     } finally {
       writeUnlock();
@@ -1917,7 +1947,7 @@ public class FSNamesystem implements Nam
     
     try {
       status = startFileInt(src, permissions, holder, clientMachine, flag,
-          createParent, replication, blockSize);
+          createParent, replication, blockSize, cacheEntry != null);
     } catch (AccessControlException e) {
       logAuditEvent(false, "create", src);
       throw e;
@@ -1929,8 +1959,8 @@ public class FSNamesystem implements Nam
 
   private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
       String holder, String clientMachine, EnumSet<CreateFlag> flag,
-      boolean createParent, short replication, long blockSize)
-      throws AccessControlException, SafeModeException,
+      boolean createParent, short replication, long blockSize,
+      boolean logRetryCache) throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -1965,8 +1995,8 @@ public class FSNamesystem implements Nam
         throw new SafeModeException("Cannot create file" + src, safeMode);
       }
       src = FSDirectory.resolvePath(src, pathComponents, dir);
-      startFileInternal(pc, src, permissions, holder, clientMachine,
-          create, overwrite, createParent, replication, blockSize);
+      startFileInternal(pc, src, permissions, holder, clientMachine, create,
+          overwrite, createParent, replication, blockSize, logRetryCache);
       stat = dir.getFileInfo(src, false);
     } catch (StandbyException se) {
       skipSync = true;
@@ -1995,8 +2025,9 @@ public class FSNamesystem implements Nam
   private void startFileInternal(FSPermissionChecker pc, String src,
       PermissionStatus permissions, String holder, String clientMachine,
       boolean create, boolean overwrite, boolean createParent,
-      short replication, long blockSize) throws FileAlreadyExistsException,
-      AccessControlException, UnresolvedLinkException, FileNotFoundException,
+      short replication, long blockSize, boolean logRetryEntry)
+      throws FileAlreadyExistsException, AccessControlException,
+      UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
     assert hasWriteLock();
     // Verify that the destination does not exist as a directory already.
@@ -2028,7 +2059,7 @@ public class FSNamesystem implements Nam
       } else {
         if (overwrite) {
           try {
-            deleteInt(src, true); // File exists - delete if overwrite
+            deleteInt(src, true, false); // File exists - delete if overwrite
           } catch (AccessControlException e) {
             logAuditEvent(false, "delete", src);
             throw e;
@@ -2054,7 +2085,7 @@ public class FSNamesystem implements Nam
       leaseManager.addLease(newNode.getClientName(), src);
 
       // record file record in log, record new generation stamp
-      getEditLog().logOpenFile(src, newNode);
+      getEditLog().logOpenFile(src, newNode, logRetryEntry);
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
                                    +"add "+src+" to namespace for "+holder);
@@ -2083,8 +2114,9 @@ public class FSNamesystem implements Nam
    * @return the last block locations if the block is partial or null otherwise
    */
   private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
-      String holder, String clientMachine) throws AccessControlException,
-      UnresolvedLinkException, FileNotFoundException, IOException {
+      String holder, String clientMachine, boolean logRetryCache)
+      throws AccessControlException, UnresolvedLinkException,
+      FileNotFoundException, IOException {
     assert hasWriteLock();
     // Verify that the destination does not exist as a directory already.
     final INodesInPath iip = dir.getINodesInPath4Write(src);
@@ -2109,7 +2141,7 @@ public class FSNamesystem implements Nam
       final DatanodeDescriptor clientNode = 
           blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
       return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode,
-          true, iip.getLatestSnapshot());
+          true, iip.getLatestSnapshot(), logRetryCache);
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
       throw ie;
@@ -2126,13 +2158,16 @@ public class FSNamesystem implements Nam
    * @param clientMachine identifier of the client machine
    * @param clientNode if the client is collocated with a DN, that DN's descriptor
    * @param writeToEditLog whether to persist this change to the edit log
+   * @param logRetryCache whether to record RPC ids in editlog for retry cache
+   *                      rebuilding
    * @return the last block locations if the block is partial or null otherwise
    * @throws UnresolvedLinkException
    * @throws IOException
    */
   LocatedBlock prepareFileForWrite(String src, INodeFile file,
       String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
-      boolean writeToEditLog, Snapshot latestSnapshot) throws IOException {
+      boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache)
+      throws IOException {
     file = file.recordModification(latestSnapshot, dir.getINodeMap());
     final INodeFileUnderConstruction cons = file.toUnderConstruction(
         leaseHolder, clientMachine, clientNode);
@@ -2142,7 +2177,7 @@ public class FSNamesystem implements Nam
     
     LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
     if (writeToEditLog) {
-      getEditLog().logOpenFile(src, cons);
+      getEditLog().logOpenFile(src, cons, logRetryCache);
     }
     return ret;
   }
@@ -2290,7 +2325,7 @@ public class FSNamesystem implements Nam
       
     boolean success = false;
     try {
-      lb = appendFileInt(src, holder, clientMachine);
+      lb = appendFileInt(src, holder, clientMachine, cacheEntry != null);
       success = true;
       return lb;
     } catch (AccessControlException e) {
@@ -2302,7 +2337,8 @@ public class FSNamesystem implements Nam
   }
 
   private LocatedBlock appendFileInt(String src, String holder,
-      String clientMachine) throws AccessControlException, SafeModeException,
+      String clientMachine, boolean logRetryCache)
+      throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2328,7 +2364,7 @@ public class FSNamesystem implements Nam
         throw new SafeModeException("Cannot append to file" + src, safeMode);
       }
       src = FSDirectory.resolvePath(src, pathComponents, dir);
-      lb = appendFileInternal(pc, src, holder, clientMachine);
+      lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
     } catch (StandbyException se) {
       skipSync = true;
       throw se;
@@ -2452,7 +2488,7 @@ public class FSNamesystem implements Nam
       newBlock = createNewBlock();
       saveAllocatedBlock(src, inodesInPath, newBlock, targets);
 
-      dir.persistBlocks(src, pendingFile);
+      dir.persistBlocks(src, pendingFile, false);
       offset = pendingFile.computeFileSize();
     } finally {
       writeUnlock();
@@ -2654,7 +2690,7 @@ public class FSNamesystem implements Nam
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                       + b + " is removed from pendingCreates");
       }
-      dir.persistBlocks(src, file);
+      dir.persistBlocks(src, file, false);
     } finally {
       writeUnlock();
     }
@@ -2871,7 +2907,7 @@ public class FSNamesystem implements Nam
     }
     boolean ret = false;
     try {
-      ret = renameToInt(src, dst);
+      ret = renameToInt(src, dst, cacheEntry != null);
     } catch (AccessControlException e) {
       logAuditEvent(false, "rename", src, dst, null);
       throw e;
@@ -2881,7 +2917,7 @@ public class FSNamesystem implements Nam
     return ret;
   }
 
-  private boolean renameToInt(String src, String dst) 
+  private boolean renameToInt(String src, String dst, boolean logRetryCache) 
     throws IOException, UnresolvedLinkException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
@@ -2905,7 +2941,7 @@ public class FSNamesystem implements Nam
       src = FSDirectory.resolvePath(src, srcComponents, dir);
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
       checkOperation(OperationCategory.WRITE);
-      status = renameToInternal(pc, src, dst);
+      status = renameToInternal(pc, src, dst, logRetryCache);
       if (status) {
         resultingStat = getAuditFileInfo(dst, false);
       }
@@ -2921,8 +2957,9 @@ public class FSNamesystem implements Nam
 
   /** @deprecated See {@link #renameTo(String, String)} */
   @Deprecated
-  private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
-    throws IOException, UnresolvedLinkException {
+  private boolean renameToInternal(FSPermissionChecker pc, String src,
+      String dst, boolean logRetryCache) throws IOException,
+      UnresolvedLinkException {
     assert hasWriteLock();
     if (isPermissionEnabled) {
       //We should not be doing this.  This is move() not renameTo().
@@ -2935,7 +2972,7 @@ public class FSNamesystem implements Nam
       checkAncestorAccess(pc, actualdst, FsAction.WRITE);
     }
 
-    if (dir.renameTo(src, dst)) {
+    if (dir.renameTo(src, dst, logRetryCache)) {
       return true;
     }
     return false;
@@ -2970,7 +3007,7 @@ public class FSNamesystem implements Nam
       }
       src = FSDirectory.resolvePath(src, srcComponents, dir);
       dst = FSDirectory.resolvePath(dst, dstComponents, dir);
-      renameToInternal(pc, src, dst, options);
+      renameToInternal(pc, src, dst, cacheEntry != null, options);
       resultingStat = getAuditFileInfo(dst, false);
       success = true;
     } finally {
@@ -2988,14 +3025,14 @@ public class FSNamesystem implements Nam
   }
 
   private void renameToInternal(FSPermissionChecker pc, String src, String dst,
-      Options.Rename... options) throws IOException {
+      boolean logRetryCache, Options.Rename... options) throws IOException {
     assert hasWriteLock();
     if (isPermissionEnabled) {
       checkParentAccess(pc, src, FsAction.WRITE);
       checkAncestorAccess(pc, dst, FsAction.WRITE);
     }
 
-    dir.renameTo(src, dst, options);
+    dir.renameTo(src, dst, logRetryCache, options);
   }
   
   /**
@@ -3013,7 +3050,7 @@ public class FSNamesystem implements Nam
     }
     boolean ret = false;
     try {
-      ret = deleteInt(src, recursive);
+      ret = deleteInt(src, recursive, cacheEntry != null);
     } catch (AccessControlException e) {
       logAuditEvent(false, "delete", src);
       throw e;
@@ -3023,13 +3060,13 @@ public class FSNamesystem implements Nam
     return ret;
   }
       
-  private boolean deleteInt(String src, boolean recursive)
+  private boolean deleteInt(String src, boolean recursive, boolean logRetryCache)
       throws AccessControlException, SafeModeException,
       UnresolvedLinkException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
     }
-    boolean status = deleteInternal(src, recursive, true);
+    boolean status = deleteInternal(src, recursive, true, logRetryCache);
     if (status) {
       logAuditEvent(true, "delete", src);
     }
@@ -3057,7 +3094,7 @@ public class FSNamesystem implements Nam
    * @see ClientProtocol#delete(String, boolean) for description of exceptions
    */
   private boolean deleteInternal(String src, boolean recursive,
-      boolean enforcePermission)
+      boolean enforcePermission, boolean logRetryCache)
       throws AccessControlException, SafeModeException, UnresolvedLinkException,
              IOException {
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
@@ -3081,7 +3118,7 @@ public class FSNamesystem implements Nam
             FsAction.ALL, false);
       }
       // Unlink the target directory from directory tree
-      if (!dir.delete(src, collectedBlocks, removedINodes)) {
+      if (!dir.delete(src, collectedBlocks, removedINodes, logRetryCache)) {
         return false;
       }
       ret = true;
@@ -3410,7 +3447,7 @@ public class FSNamesystem implements Nam
       if (lastBlockLength > 0) {
         pendingFile.updateLengthOfLastBlock(lastBlockLength);
       }
-      dir.persistBlocks(src, pendingFile);
+      dir.persistBlocks(src, pendingFile, false);
     } finally {
       writeUnlock();
     }
@@ -3707,7 +3744,7 @@ public class FSNamesystem implements Nam
         src = closeFileCommitBlocks(pendingFile, storedBlock);
       } else {
         // If this commit does not want to close the file, persist blocks
-        src = persistBlocks(pendingFile);
+        src = persistBlocks(pendingFile, false);
       }
     } finally {
       writeUnlock();
@@ -3756,10 +3793,10 @@ public class FSNamesystem implements Nam
    * @throws IOException
    */
   @VisibleForTesting
-  String persistBlocks(INodeFileUnderConstruction pendingFile)
-      throws IOException {
+  String persistBlocks(INodeFileUnderConstruction pendingFile,
+      boolean logRetryCache) throws IOException {
     String src = leaseManager.findPath(pendingFile);
-    dir.persistBlocks(src, pendingFile);
+    dir.persistBlocks(src, pendingFile, logRetryCache);
     return src;
   }
 
@@ -5545,7 +5582,8 @@ public class FSNamesystem implements Nam
       }
       assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
         + oldBlock + " has different block identifier";
-      updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
+      updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
+          cacheEntry != null);
       success = true;
     } finally {
       writeUnlock();
@@ -5557,7 +5595,7 @@ public class FSNamesystem implements Nam
 
   /** @see #updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */
   private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock, 
-      ExtendedBlock newBlock, DatanodeID[] newNodes)
+      ExtendedBlock newBlock, DatanodeID[] newNodes, boolean logRetryCache)
       throws IOException {
     assert hasWriteLock();
     // check the vadility of the block and lease holder name
@@ -5592,7 +5630,7 @@ public class FSNamesystem implements Nam
     blockinfo.setExpectedLocations(descriptors);
 
     String src = leaseManager.findPath(pendingFile);
-    dir.persistBlocks(src, pendingFile);
+    dir.persistBlocks(src, pendingFile, logRetryCache);
   }
 
   // rename was successful. If any part of the renamed subtree had
@@ -6357,7 +6395,8 @@ public class FSNamesystem implements Nam
       } finally {
         dir.writeUnlock();
       }
-      getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
+      getEditLog().logCreateSnapshot(snapshotRoot, snapshotName,
+          cacheEntry != null);
     } finally {
       writeUnlock();
       RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
@@ -6399,7 +6438,8 @@ public class FSNamesystem implements Nam
       dir.verifySnapshotName(snapshotNewName, path);
       
       snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
-      getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
+      getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName,
+          cacheEntry != null);
       success = true;
     } finally {
       writeUnlock();
@@ -6523,7 +6563,8 @@ public class FSNamesystem implements Nam
       removedINodes.clear();
       this.removeBlocks(collectedBlocks);
       collectedBlocks.clear();
-      getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
+      getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
+          cacheEntry != null);
       success = true;
     } finally {
       writeUnlock();

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Jul 30 08:01:00 2013
@@ -726,7 +726,8 @@ public class NameNode {
   }
     
   /** get FSImage */
-  FSImage getFSImage() {
+  @VisibleForTesting
+  public FSImage getFSImage() {
     return namesystem.dir.fsImage;
   }