You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2013/08/22 03:12:57 UTC

svn commit: r1516351 - in /hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/DFSClient.java src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java

Author: jing9
Date: Thu Aug 22 01:12:56 2013
New Revision: 1516351

URL: http://svn.apache.org/r1516351
Log:
HDFS-5045. Merge change r1516349 from branch-2.

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1516351&r1=1516350&r2=1516351&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 22 01:12:56 2013
@@ -46,6 +46,9 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-5111. Remove duplicated error message for snapshot commands when 
     processing invalid arguments. (jing9)
 
+    HDFS-5045. Add more unit tests for retry cache to cover all AtMostOnce 
+    methods. (jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1516351&r1=1516350&r2=1516351&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu Aug 22 01:12:56 2013
@@ -582,7 +582,8 @@ public class DFSClient implements java.i
     return dfsClientConf.hdfsTimeout;
   }
   
-  String getClientName() {
+  @VisibleForTesting
+  public String getClientName() {
     return clientName;
   }
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java?rev=1516351&r1=1516350&r2=1516351&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java Thu Aug 22 01:12:56 2013
@@ -26,15 +26,22 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -43,9 +50,20 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
+import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.io.retry.RetryInvocationHandler;
@@ -60,14 +78,13 @@ import org.junit.Test;
 public class TestRetryCacheWithHA {
   private static final Log LOG = LogFactory.getLog(TestRetryCacheWithHA.class);
   
-  private static MiniDFSCluster cluster;
-  private static DistributedFileSystem dfs;
-  private static Configuration conf = new HdfsConfiguration();
-  
   private static final int BlockSize = 1024;
   private static final short DataNodes = 3;
-  private final static Map<String, Object> results = 
-      new HashMap<String, Object>();
+  private static final int CHECKTIMES = 10;
+  
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem dfs;
+  private Configuration conf = new HdfsConfiguration();
   
   /** 
    * A dummy invocation handler extending RetryInvocationHandler. We can use
@@ -120,7 +137,7 @@ public class TestRetryCacheWithHA {
    * 2. Trigger the NN failover
    * 3. Check the retry cache on the original standby NN
    */
-  @Test
+  @Test (timeout=60000)
   public void testRetryCacheOnStandbyNN() throws Exception {
     // 1. run operations
     DFSTestUtil.runOperations(cluster, dfs, conf, BlockSize, 0);
@@ -180,26 +197,624 @@ public class TestRetryCacheWithHA {
     return client;
   }
   
+  abstract class AtMostOnceOp {
+    private final String name;
+    final DFSClient client;
+    
+    AtMostOnceOp(String name, DFSClient client) {
+      this.name = name;
+      this.client = client;
+    }
+    
+    abstract void prepare() throws Exception;
+    abstract void invoke() throws Exception;
+    abstract boolean checkNamenodeBeforeReturn() throws Exception;
+    abstract Object getResult();
+  }
+  
+  /** createSnapshot operaiton */
+  class CreateSnapshotOp extends AtMostOnceOp {
+    private String snapshotPath;
+    private String dir;
+    private String snapshotName;
+    
+    CreateSnapshotOp(DFSClient client, String dir, String snapshotName) {
+      super("createSnapshot", client);
+      this.dir = dir;
+      this.snapshotName = snapshotName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path dirPath = new Path(dir);
+      if (!dfs.exists(dirPath)) {
+        dfs.mkdirs(dirPath);
+        dfs.allowSnapshot(dirPath);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      this.snapshotPath = client.createSnapshot(dir, snapshotName);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      final Path sPath = SnapshotTestHelper.getSnapshotRoot(new Path(dir),
+          snapshotName);
+      boolean snapshotCreated = dfs.exists(sPath);
+      for (int i = 0; i < CHECKTIMES && !snapshotCreated; i++) {
+        Thread.sleep(1000);
+        snapshotCreated = dfs.exists(sPath);
+      }
+      return snapshotCreated;
+    }
+
+    @Override
+    Object getResult() {
+      return snapshotPath;
+    }
+  }
+  
+  /** deleteSnapshot */
+  class DeleteSnapshotOp extends AtMostOnceOp {
+    private String dir;
+    private String snapshotName;
+    
+    DeleteSnapshotOp(DFSClient client, String dir, String snapshotName) {
+      super("deleteSnapshot", client);
+      this.dir = dir;
+      this.snapshotName = snapshotName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path dirPath = new Path(dir);
+      if (!dfs.exists(dirPath)) {
+        dfs.mkdirs(dirPath);
+      }
+      
+      Path sPath = SnapshotTestHelper.getSnapshotRoot(dirPath, snapshotName);
+      if (!dfs.exists(sPath)) {
+        dfs.allowSnapshot(dirPath);
+        dfs.createSnapshot(dirPath, snapshotName);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.deleteSnapshot(dir, snapshotName);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      final Path sPath = SnapshotTestHelper.getSnapshotRoot(new Path(dir),
+          snapshotName);
+      boolean snapshotNotDeleted = dfs.exists(sPath);
+      for (int i = 0; i < CHECKTIMES && snapshotNotDeleted; i++) {
+        Thread.sleep(1000);
+        snapshotNotDeleted = dfs.exists(sPath);
+      }
+      return !snapshotNotDeleted;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  /** renameSnapshot */
+  class RenameSnapshotOp extends AtMostOnceOp {
+    private String dir;
+    private String oldName;
+    private String newName;
+    
+    RenameSnapshotOp(DFSClient client, String dir, String oldName,
+        String newName) {
+      super("renameSnapshot", client);
+      this.dir = dir;
+      this.oldName = oldName;
+      this.newName = newName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path dirPath = new Path(dir);
+      if (!dfs.exists(dirPath)) {
+        dfs.mkdirs(dirPath);
+      }
+      
+      Path sPath = SnapshotTestHelper.getSnapshotRoot(dirPath, oldName);
+      if (!dfs.exists(sPath)) {
+        dfs.allowSnapshot(dirPath);
+        dfs.createSnapshot(dirPath, oldName);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.renameSnapshot(dir, oldName, newName);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      final Path sPath = SnapshotTestHelper.getSnapshotRoot(new Path(dir),
+          newName);
+      boolean snapshotRenamed = dfs.exists(sPath);
+      for (int i = 0; i < CHECKTIMES && !snapshotRenamed; i++) {
+        Thread.sleep(1000);
+        snapshotRenamed = dfs.exists(sPath);
+      }
+      return snapshotRenamed;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  /** create file operation (without OverWrite) */
+  class CreateOp extends AtMostOnceOp {
+    private String fileName;
+    private HdfsFileStatus status;
+    
+    CreateOp(DFSClient client, String fileName) {
+      super("create", client);
+      this.fileName = fileName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path filePath = new Path(fileName);
+      if (dfs.exists(filePath)) {
+        dfs.delete(filePath, true);
+      }
+      final Path fileParent = filePath.getParent();
+      if (!dfs.exists(fileParent)) {
+        dfs.mkdirs(fileParent);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
+      this.status = client.getNamenode().create(fileName,
+          FsPermission.getFileDefault(), client.getClientName(),
+          new EnumSetWritable<CreateFlag>(createFlag), false, DataNodes,
+          BlockSize);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      final Path filePath = new Path(fileName);
+      boolean fileCreated = dfs.exists(filePath);
+      for (int i = 0; i < CHECKTIMES && !fileCreated; i++) {
+        Thread.sleep(1000);
+        fileCreated = dfs.exists(filePath);
+      }
+      return fileCreated;
+    }
+
+    @Override
+    Object getResult() {
+      return status;
+    }
+  }
+  
+  /** append operation */
+  class AppendOp extends AtMostOnceOp {
+    private String fileName;
+    private LocatedBlock lbk;
+    
+    AppendOp(DFSClient client, String fileName) {
+      super("append", client);
+      this.fileName = fileName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path filePath = new Path(fileName);
+      if (!dfs.exists(filePath)) {
+        DFSTestUtil.createFile(dfs, filePath, BlockSize / 2, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      lbk = client.getNamenode().append(fileName, client.getClientName());
+    }
+    
+    // check if the inode of the file is under construction
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      INodeFile fileNode = cluster.getNameNode(0).getNamesystem()
+          .getFSDirectory().getINode4Write(fileName).asFile();
+      boolean fileIsUC = fileNode.isUnderConstruction();
+      for (int i = 0; i < CHECKTIMES && !fileIsUC; i++) {
+        Thread.sleep(1000);
+        fileNode = cluster.getNameNode(0).getNamesystem().getFSDirectory()
+            .getINode4Write(fileName).asFile();
+        fileIsUC = fileNode.isUnderConstruction();
+      }
+      return fileIsUC;
+    }
+
+    @Override
+    Object getResult() {
+      return lbk;
+    }
+  }
+  
+  /** rename */
+  class RenameOp extends AtMostOnceOp {
+    private String oldName;
+    private String newName;
+    private boolean renamed;
+    
+    RenameOp(DFSClient client, String oldName, String newName) {
+      super("rename", client);
+      this.oldName = oldName;
+      this.newName = newName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path filePath = new Path(oldName);
+      if (!dfs.exists(filePath)) {
+        DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
+      }
+    }
+    
+    @SuppressWarnings("deprecation")
+    @Override
+    void invoke() throws Exception {
+      this.renamed = client.rename(oldName, newName);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      Path targetPath = new Path(newName);
+      boolean renamed = dfs.exists(targetPath);
+      for (int i = 0; i < CHECKTIMES && !renamed; i++) {
+        Thread.sleep(1000);
+        renamed = dfs.exists(targetPath);
+      }
+      return renamed;
+    }
+
+    @Override
+    Object getResult() {
+      return new Boolean(renamed);
+    }
+  }
+  
+  /** rename2 */
+  class Rename2Op extends AtMostOnceOp {
+    private String oldName;
+    private String newName;
+    
+    Rename2Op(DFSClient client, String oldName, String newName) {
+      super("rename2", client);
+      this.oldName = oldName;
+      this.newName = newName;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path filePath = new Path(oldName);
+      if (!dfs.exists(filePath)) {
+        DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.rename(oldName, newName, Rename.OVERWRITE);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      Path targetPath = new Path(newName);
+      boolean renamed = dfs.exists(targetPath);
+      for (int i = 0; i < CHECKTIMES && !renamed; i++) {
+        Thread.sleep(1000);
+        renamed = dfs.exists(targetPath);
+      }
+      return renamed;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  /** concat */
+  class ConcatOp extends AtMostOnceOp {
+    private String target;
+    private String[] srcs;
+    private Path[] srcPaths;
+    
+    ConcatOp(DFSClient client, Path target, int numSrc) {
+      super("concat", client);
+      this.target = target.toString();
+      this.srcs = new String[numSrc];
+      this.srcPaths = new Path[numSrc];
+      Path parent = target.getParent();
+      for (int i = 0; i < numSrc; i++) {
+        srcPaths[i] = new Path(parent, "srcfile" + i);
+        srcs[i] = srcPaths[i].toString();
+      }
+    }
+
+    @Override
+    void prepare() throws Exception {
+      DFSTestUtil.createFile(dfs, new Path(target), BlockSize, DataNodes, 0);
+      for (int i = 0; i < srcPaths.length; i++) {
+        DFSTestUtil.createFile(dfs, srcPaths[i], BlockSize, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.concat(target, srcs);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      Path targetPath = new Path(target);
+      boolean done = dfs.exists(targetPath);
+      for (int i = 0; i < CHECKTIMES && !done; i++) {
+        Thread.sleep(1000);
+        done = dfs.exists(targetPath);
+      }
+      return done;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  /** delete */
+  class DeleteOp extends AtMostOnceOp {
+    private String target;
+    private boolean deleted;
+    
+    DeleteOp(DFSClient client, String target) {
+      super("delete", client);
+      this.target = target;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      Path p = new Path(target);
+      if (!dfs.exists(p)) {
+        DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      deleted = client.delete(target, true);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      Path targetPath = new Path(target);
+      boolean del = !dfs.exists(targetPath);
+      for (int i = 0; i < CHECKTIMES && !del; i++) {
+        Thread.sleep(1000);
+        del = !dfs.exists(targetPath);
+      }
+      return del;
+    }
+
+    @Override
+    Object getResult() {
+      return new Boolean(deleted);
+    }
+  }
+  
+  /** createSymlink */
+  class CreateSymlinkOp extends AtMostOnceOp {
+    private String target;
+    private String link;
+    
+    public CreateSymlinkOp(DFSClient client, String target, String link) {
+      super("createSymlink", client);
+      this.target = target;
+      this.link = link;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      Path p = new Path(target);
+      if (!dfs.exists(p)) {
+        DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.createSymlink(target, link, false);
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      Path linkPath = new Path(link);
+      FileStatus linkStatus = dfs.getFileLinkStatus(linkPath);
+      for (int i = 0; i < CHECKTIMES && linkStatus == null; i++) {
+        Thread.sleep(1000);
+        linkStatus = dfs.getFileLinkStatus(linkPath);
+      }
+      return linkStatus != null;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  /** updatePipeline */
+  class UpdatePipelineOp extends AtMostOnceOp {
+    private String file;
+    private ExtendedBlock oldBlock;
+    private ExtendedBlock newBlock;
+    private DatanodeInfo[] nodes;
+    private FSDataOutputStream out;
+    
+    public UpdatePipelineOp(DFSClient client, String file) {
+      super("updatePipeline", client);
+      this.file = file;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      final Path filePath = new Path(file);
+      DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
+      // append to the file and leave the last block under construction
+      out = this.client.append(file, BlockSize, null, null);
+      byte[] appendContent = new byte[100];
+      new Random().nextBytes(appendContent);
+      out.write(appendContent);
+      ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+      
+      LocatedBlocks blks = dfs.getClient()
+          .getLocatedBlocks(file, BlockSize + 1);
+      assertEquals(1, blks.getLocatedBlocks().size());
+      nodes = blks.get(0).getLocations();
+      oldBlock = blks.get(0).getBlock();
+      
+      LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline(
+          oldBlock, client.getClientName());
+      newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(),
+          oldBlock.getBlockId(), oldBlock.getNumBytes(), 
+          newLbk.getBlock().getGenerationStamp());
+    }
+
+    @Override
+    void invoke() throws Exception {
+      DatanodeInfo[] newNodes = new DatanodeInfo[2];
+      newNodes[0] = nodes[0];
+      newNodes[1] = nodes[1];
+      
+      client.getNamenode().updatePipeline(client.getClientName(), oldBlock,
+          newBlock, newNodes);
+      out.close();
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      INodeFileUnderConstruction fileNode = (INodeFileUnderConstruction) cluster
+          .getNamesystem(0).getFSDirectory().getINode4Write(file).asFile();
+      BlockInfoUnderConstruction blkUC = 
+          (BlockInfoUnderConstruction) (fileNode.getBlocks())[1];
+      int datanodeNum = blkUC.getExpectedLocations().length;
+      for (int i = 0; i < CHECKTIMES && datanodeNum != 2; i++) {
+        Thread.sleep(1000);
+        datanodeNum = blkUC.getExpectedLocations().length;
+      }
+      return datanodeNum == 2;
+    }
+    
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+  
+  @Test (timeout=60000)
+  public void testCreateSnapshot() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new CreateSnapshotOp(client, "/test", "s1");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testDeleteSnapshot() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new DeleteSnapshotOp(client, "/test", "s1");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testRenameSnapshot() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new RenameSnapshotOp(client, "/test", "s1", "s2");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testCreate() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new CreateOp(client, "/testfile");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testAppend() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new AppendOp(client, "/testfile");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testRename() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new RenameOp(client, "/file1", "/file2");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testRename2() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new Rename2Op(client, "/file1", "/file2");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testConcat() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new ConcatOp(client, new Path("/test/file"), 5);
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testDelete() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new DeleteOp(client, "/testfile");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testCreateSymlink() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new CreateSymlinkOp(client, "/testfile", "/testlink");
+    testClientRetryWithFailover(op);
+  }
+  
+  @Test (timeout=60000)
+  public void testUpdatePipeline() throws Exception {
+    final DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new UpdatePipelineOp(client, "/testfile");
+    testClientRetryWithFailover(op);
+  }
+  
   /**
    * When NN failover happens, if the client did not receive the response and
    * send a retry request to the other NN, the same response should be recieved
    * based on the retry cache.
-   * 
-   * TODO: currently we only test the createSnapshot from the client side. We 
-   * may need to cover all the calls with "@AtMostOnce" annotation.
    */
-  @Test
-  public void testClientRetryWithFailover() throws Exception {
-    final String dir = "/test";
-    final Path dirPath = new Path(dir);
-    final String sName = "s1";
-    final String dirSnapshot = dir + HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR
-        + Path.SEPARATOR + sName;
+  public void testClientRetryWithFailover(final AtMostOnceOp op)
+      throws Exception {
+    final Map<String, Object> results = new HashMap<String, Object>();
     
-    dfs.mkdirs(dirPath);
-    dfs.allowSnapshot(dirPath);
-    
-    final DFSClient client = genClientWithDummyHandler();
+    op.prepare();
     // set DummyRetryInvocationHandler#block to true
     DummyRetryInvocationHandler.block.set(true);
     
@@ -207,28 +822,25 @@ public class TestRetryCacheWithHA {
       @Override
       public void run() {
         try {
-          final String snapshotPath = client.createSnapshot(dir, "s1");
-          assertEquals(dirSnapshot, snapshotPath);
-          LOG.info("Created snapshot " + snapshotPath);
+          op.invoke();
+          Object result = op.getResult();
+          LOG.info("Operation " + op.name + " finished");
           synchronized (TestRetryCacheWithHA.this) {
-            results.put("createSnapshot", snapshotPath);
+            results.put(op.name, result == null ? "SUCCESS" : result);
             TestRetryCacheWithHA.this.notifyAll();
           }
-        } catch (IOException e) {
-          LOG.info("Got IOException " + e + " while creating snapshot");
+        } catch (Exception e) {
+          LOG.info("Got Exception while calling " + op.name, e);
         } finally {
-          IOUtils.cleanup(null, client);
+          IOUtils.cleanup(null, op.client);
         }
       }
     }.start();
     
-    // make sure the client's createSnapshot call has actually been handled by
-    // the active NN
-    boolean snapshotCreated = dfs.exists(new Path(dirSnapshot));
-    while (!snapshotCreated) {
-      Thread.sleep(1000);
-      snapshotCreated = dfs.exists(new Path(dirSnapshot));
-    }
+    // make sure the client's call has actually been handled by the active NN
+    assertTrue("After waiting the operation " + op.name
+        + " still has not taken effect on NN yet",
+        op.checkNamenodeBeforeReturn());
     
     // force the failover
     cluster.transitionToStandby(0);
@@ -238,11 +850,11 @@ public class TestRetryCacheWithHA {
     DummyRetryInvocationHandler.block.set(false);
     
     synchronized (this) {
-      while (!results.containsKey("createSnapshot")) {
+      while (!results.containsKey(op.name)) {
         this.wait();
       }
-      LOG.info("Got the result of createSnapshot: "
-          + results.get("createSnapshot"));
+      LOG.info("Got the result of " + op.name + ": "
+          + results.get(op.name));
     }
   }
 }
\ No newline at end of file