You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/06/12 07:33:56 UTC

svn commit: r1349126 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project: ./ hadoop-hdfs/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ hadoop-hdfs/src/main/native/ hadoop-hdfs/src/test/hdfs/ hadoop-hdfs/src/test/ja...

Author: szetszwo
Date: Tue Jun 12 05:33:54 2012
New Revision: 1349126

URL: http://svn.apache.org/viewvc?rev=1349126&view=rev
Log:
svn merge -c 1349124 from trunk for HDFS-3504.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project:r1349124

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1349124

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1349126&r1=1349125&r2=1349126&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jun 12 05:33:54 2012
@@ -70,6 +70,10 @@ Release 2.0.1-alpha - UNRELEASED
 
     HDFS-3520. Add transfer rate logging to TransferFsImage. (eli)
 
+    HDFS-3504. Support configurable retry policy in DFSClient for RPC
+    connections and RPC calls, and add MultipleLinearRandomRetry, a new retry
+    policy.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-2982. Startup performance suffers when there are many edit log

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1349124

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1349126&r1=1349125&r2=1349126&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Jun 12 05:33:54 2012
@@ -38,6 +38,10 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
   public static final String  DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
   public static final int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
+  public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY = "dfs.client.retry.policy.enabled";
+  public static final boolean DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false; 
+  public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_KEY = "dfs.client.retry.policy.spec";
+  public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,... 
   public static final String  DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
   public static final String  DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1349126&r1=1349125&r2=1349126&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Tue Jun 12 05:33:54 2012
@@ -47,10 +47,12 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -66,6 +68,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
 
 /**
  * Create proxy objects to communicate with a remote NN. All remote access to an
@@ -240,12 +243,106 @@ public class NameNodeProxies {
     return new NamenodeProtocolTranslatorPB(proxy);
   }
   
+  /**
+   * Return the default retry policy used in RPC.
+   * 
+   * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
+   * 
+   * Otherwise, first unwrap ServiceException if possible, and then 
+   * (1) use multipleLinearRandomRetry for
+   *     - SafeModeException, or
+   *     - IOException other than RemoteException, or
+   *     - ServiceException; and
+   * (2) use TRY_ONCE_THEN_FAIL for
+   *     - non-SafeMode RemoteException, or
+   *     - non-IOException.
+   *     
+   * Note that dfs.client.retry.max < 0 is not allowed.
+   */
+  private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) {
+    final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
+    }
+    if (multipleLinearRandomRetry == null) {
+      //no retry
+      return RetryPolicies.TRY_ONCE_THEN_FAIL;
+    } else {
+      return new RetryPolicy() {
+        @Override
+        public RetryAction shouldRetry(Exception e, int retries, int failovers,
+            boolean isMethodIdempotent) throws Exception {
+          if (e instanceof ServiceException) {
+            //unwrap ServiceException
+            final Throwable cause = e.getCause();
+            if (cause != null && cause instanceof Exception) {
+              e = (Exception)cause;
+            }
+          }
+
+          //see (1) and (2) in the javadoc of this method.
+          final RetryPolicy p;
+          if (e instanceof RemoteException) {
+            final RemoteException re = (RemoteException)e;
+            p = SafeModeException.class.getName().equals(re.getClassName())?
+                multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
+          } else if (e instanceof IOException || e instanceof ServiceException) {
+            p = multipleLinearRandomRetry;
+          } else { //non-IOException
+            p = RetryPolicies.TRY_ONCE_THEN_FAIL;
+          }
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("RETRY " + retries + ") policy="
+                + p.getClass().getSimpleName() + ", exception=" + e);
+          }
+          LOG.info("RETRY " + retries + ") policy="
+              + p.getClass().getSimpleName() + ", exception=" + e);
+          return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
+        }
+      };
+    }
+  }
+
+  /**
+   * Return the MultipleLinearRandomRetry policy specified in the conf,
+   * or null if the feature is disabled.
+   * If the policy is specified in the conf but the policy cannot be parsed,
+   * the default policy is returned.
+   * 
+   * Conf property: N pairs of sleep-time and number-of-retries
+   *   dfs.client.retry.policy = "s1,n1,s2,n2,..."
+   */
+  private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
+    final boolean enabled = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
+    if (!enabled) {
+      return null;
+    }
+
+    final String policy = conf.get(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+
+    final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
+    return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+  }
+
   private static ClientProtocol createNNProxyWithClientProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
       boolean withRetries) throws IOException {
-    ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB) NameNodeProxies
-        .createNameNodeProxy(address, conf, ugi, ClientNamenodeProtocolPB.class);
+    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
+
+    final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf);
+    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
+    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
+        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
+        NetUtils.getDefaultSocketFactory(conf), 0, defaultPolicy).getProxy();
+
     if (withRetries) { // create the proxy with retries
+
       RetryPolicy createPolicy = RetryPolicies
           .retryUpToMaximumCountWithFixedSleep(5,
               HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
@@ -258,17 +355,21 @@ public class NameNodeProxies {
       Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
                  = new HashMap<Class<? extends Exception>, RetryPolicy>();
       exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
-          .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+          .retryByRemoteException(defaultPolicy,
               remoteExceptionToPolicyMap));
       RetryPolicy methodPolicy = RetryPolicies.retryByException(
-          RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+          defaultPolicy, exceptionToPolicyMap);
       Map<String, RetryPolicy> methodNameToPolicyMap 
                  = new HashMap<String, RetryPolicy>();
     
       methodNameToPolicyMap.put("create", methodPolicy);
     
-      proxy = (ClientNamenodeProtocolPB) RetryProxy
-          .create(ClientNamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
+      proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
+          ClientNamenodeProtocolPB.class,
+          new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
+              ClientNamenodeProtocolPB.class, proxy),
+          methodNameToPolicyMap,
+          defaultPolicy);
     }
     return new ClientNamenodeProtocolTranslatorPB(proxy);
   }

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1349124

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1349124

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1349126&r1=1349125&r2=1349126&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Jun 12 05:33:54 2012
@@ -25,8 +25,6 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
@@ -39,6 +37,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 
@@ -66,12 +66,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
-import org.apache.hadoop.ha.HAServiceProtocolHelper;
 import org.apache.hadoop.ha.ServiceFailedException;
-import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
-import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -1375,7 +1372,6 @@ public class MiniDFSCluster {
       waitClusterUp();
       LOG.info("Restarted the namenode");
       waitActive();
-      LOG.info("Cluster is active");
     }
   }
 
@@ -1751,6 +1747,7 @@ public class MiniDFSCluster {
         }
       }
     }
+    LOG.info("Cluster is active");
   }
   
   private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1349126&r1=1349125&r2=1349126&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java Tue Jun 12 05:33:54 2012
@@ -25,44 +25,51 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-import java.net.SocketTimeoutException;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
+import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies.MultipleLinearRandomRetry;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.log4j.Level;
 import org.mockito.Mockito;
 import org.mockito.internal.stubbing.answers.ThrowsException;
 import org.mockito.invocation.InvocationOnMock;
@@ -339,7 +346,7 @@ public class TestDFSClientRetries extend
 
           // We shouldn't have gained an extra block by the RPC.
           assertEquals(blockCount, blockCount2);
-          return (LocatedBlock) ret2;
+          return ret2;
         }
       }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
@@ -744,5 +751,149 @@ public class TestDFSClientRetries extend
       server.stop();
     }
   }
-}
 
+  /** Test client retry with namenode restarting. */
+  public void testNamenodeRestart() throws Exception {
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+
+    final List<Exception> exceptions = new ArrayList<Exception>();
+
+    final Path dir = new Path("/testNamenodeRestart");
+
+    final Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
+
+    final short numDatanodes = 3;
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDatanodes)
+        .build();
+    try {
+      cluster.waitActive();
+
+      //create a file
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final long length = 1L << 20;
+      final Path file1 = new Path(dir, "foo"); 
+      DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L);
+
+      //get file status
+      final FileStatus s1 = dfs.getFileStatus(file1);
+      assertEquals(length, s1.getLen());
+
+      //shutdown namenode
+      cluster.shutdownNameNode(0);
+
+      //namenode is down, create another file in a thread
+      final Path file3 = new Path(dir, "file"); 
+      final Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            //it should retry till namenode is up.
+            final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf);
+            DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L);
+          } catch (Exception e) {
+            exceptions.add(e);
+          }
+        }
+      });
+      thread.start();
+
+      //restart namenode in a new thread
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            //sleep, restart, and then wait active
+            TimeUnit.SECONDS.sleep(30);
+            cluster.restartNameNode(0, false);
+            cluster.waitActive();
+          } catch (Exception e) {
+            exceptions.add(e);
+          }
+        }
+      }).start();
+
+      //namenode is down, it should retry until namenode is up again. 
+      final FileStatus s2 = dfs.getFileStatus(file1);
+      assertEquals(s1, s2);
+
+      //check file1 and file3
+      thread.join();
+      assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3));
+
+      //enter safe mode
+      dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      
+      //leave safe mode in a new thread
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            //sleep and then leave safe mode
+            TimeUnit.SECONDS.sleep(30);
+            dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+          } catch (Exception e) {
+            exceptions.add(e);
+          }
+        }
+      }).start();
+
+      //namenode is in safe mode, create should retry until it leaves safe mode.
+      final Path file2 = new Path(dir, "bar");
+      DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L);
+      assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2));
+      
+      //make sure it won't retry on exceptions like FileNotFoundException
+      final Path nonExisting = new Path(dir, "nonExisting");
+      LOG.info("setPermission: " + nonExisting);
+      try {
+        dfs.setPermission(nonExisting, new FsPermission((short)0));
+        fail();
+      } catch(FileNotFoundException fnfe) {
+        LOG.info("GOOD!", fnfe);
+      }
+
+      if (!exceptions.isEmpty()) {
+        LOG.error("There are " + exceptions.size() + " exception(s):");
+        for(int i = 0; i < exceptions.size(); i++) {
+          LOG.error("Exception " + i, exceptions.get(i));
+        }
+        fail();
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  public void testMultipleLinearRandomRetry() {
+    parseMultipleLinearRandomRetry(null, "");
+    parseMultipleLinearRandomRetry(null, "11");
+    parseMultipleLinearRandomRetry(null, "11,22,33");
+    parseMultipleLinearRandomRetry(null, "11,22,33,44,55");
+    parseMultipleLinearRandomRetry(null, "AA");
+    parseMultipleLinearRandomRetry(null, "11,AA");
+    parseMultipleLinearRandomRetry(null, "11,22,33,FF");
+    parseMultipleLinearRandomRetry(null, "11,-22");
+    parseMultipleLinearRandomRetry(null, "-11,22");
+
+    parseMultipleLinearRandomRetry("[22x11ms]",
+        "11,22");
+    parseMultipleLinearRandomRetry("[22x11ms, 44x33ms]",
+        "11,22,33,44");
+    parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]",
+        "11,22,33,44,55,66");
+    parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]",
+        "   11,   22, 33,  44, 55,  66   ");
+  }
+  
+  static void parseMultipleLinearRandomRetry(String expected, String s) {
+    final MultipleLinearRandomRetry r = MultipleLinearRandomRetry.parseCommaSeparatedString(s);
+    LOG.info("input=" + s + ", parsed=" + r + ", expected=" + expected);
+    if (r == null) {
+      assertEquals(expected, null);
+    } else {
+      assertEquals("MultipleLinearRandomRetry" + expected, r.toString());
+    }
+  }
+}