You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/08/01 01:27:41 UTC

svn commit: r799769 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSClient.java src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Author: szetszwo
Date: Fri Jul 31 23:27:40 2009
New Revision: 799769

URL: http://svn.apache.org/viewvc?rev=799769&view=rev
Log:
HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.  Contributed by Bill Zeller

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=799769&r1=799768&r2=799769&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Jul 31 23:27:40 2009
@@ -123,6 +123,9 @@
     HDFS-119. Fix a bug in logSync(), which causes NameNode block forever.
     (Suresh Srinivas via shv)
 
+    HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
+    (Bill Zeller via szetszwo)
+
 Release 0.20.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=799769&r1=799768&r2=799769&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jul 31 23:27:40 2009
@@ -127,8 +127,8 @@
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
-  final private ClientProtocol namenode;
-  final private ClientProtocol rpcNamenode;
+  private ClientProtocol namenode;
+  private ClientProtocol rpcNamenode;
   final UnixUserGroupInformation ugi;
   volatile boolean clientRunning = true;
   Random r = new Random();
@@ -219,6 +219,29 @@
   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
                    FileSystem.Statistics stats)
     throws IOException {
+    this(conf, stats);
+    this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
+    this.namenode = createNamenode(this.rpcNamenode);
+  }
+
+  /** 
+   * Create a new DFSClient connected to the given namenode
+   * and rpcNamenode objects.
+   * 
+   * This constructor was written to allow easy testing of the DFSClient class.
+   * End users will most likely want to use one of the other constructors.
+   */
+  public DFSClient(ClientProtocol namenode, ClientProtocol rpcNamenode,
+                   Configuration conf, FileSystem.Statistics stats)
+    throws IOException {
+      this(conf, stats);
+      this.namenode = namenode;
+      this.rpcNamenode = rpcNamenode;
+  }
+
+  
+  private DFSClient(Configuration conf, FileSystem.Statistics stats)
+    throws IOException {
     this.conf = conf;
     this.stats = stats;
     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
@@ -240,9 +263,6 @@
       throw (IOException)(new IOException().initCause(e));
     }
 
-    this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
-    this.namenode = createNamenode(rpcNamenode);
-
     String taskId = conf.get("mapred.task.id");
     if (taskId != null) {
       this.clientName = "DFSClient_" + taskId; 
@@ -2856,7 +2876,7 @@
       }
 
       private LocatedBlock locateFollowingBlock(long start) throws IOException {
-        int retries = 5;
+        int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
         long sleeptime = 400;
         while (true) {
           long localstart = System.currentTimeMillis();
@@ -2872,26 +2892,32 @@
               if (ue != e) { 
                 throw ue; // no need to retry these exceptions
               }
-
-              if (--retries == 0 && 
-                  !NotReplicatedYetException.class.getName().
+              
+              
+              if (NotReplicatedYetException.class.getName().
                   equals(e.getClassName())) {
-                throw e;
-              } else {
-                LOG.info(StringUtils.stringifyException(e));
-                if (System.currentTimeMillis() - localstart > 5000) {
-                  LOG.info("Waiting for replication for "
-                      + (System.currentTimeMillis() - localstart) / 1000
-                      + " seconds");
-                }
-                try {
-                  LOG.warn("NotReplicatedYetException sleeping " + src
-                      + " retries left " + retries);
-                  Thread.sleep(sleeptime);
-                  sleeptime *= 2;
-                } catch (InterruptedException ie) {
+                if (retries == 0) { 
+                  throw e;
+                } else {
+                  --retries;
+                  LOG.info(StringUtils.stringifyException(e));
+                  if (System.currentTimeMillis() - localstart > 5000) {
+                    LOG.info("Waiting for replication for "
+                        + (System.currentTimeMillis() - localstart) / 1000
+                        + " seconds");
+                  }
+                  try {
+                    LOG.warn("NotReplicatedYetException sleeping " + src
+                        + " retries left " + retries);
+                    Thread.sleep(sleeptime);
+                    sleeptime *= 2;
+                  } catch (InterruptedException ie) {
+                  }
                 }
-              }                
+              } else {
+                throw e;
+              }
+
             }
           }
         } 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=799769&r1=799768&r2=799769&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Jul 31 23:27:40 2009
@@ -21,10 +21,18 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.server.common.*;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
 
 import junit.framework.TestCase;
 
@@ -34,6 +42,8 @@
  * properly in case of errors.
  */
 public class TestDFSClientRetries extends TestCase {
+  public static final Log LOG =
+    LogFactory.getLog(TestDFSClientRetries.class.getName());
   
   // writes 'len' bytes of data to out.
   private static void writeData(OutputStream out, int len) throws IOException {
@@ -97,4 +107,132 @@
   }
   
   // more tests related to different failure cases can be added here.
+  
+  class TestNameNode implements ClientProtocol
+  {
+    int num_calls = 0;
+    
+    // The total number of calls that can be made to addBlock
+    // before an exception is thrown
+    int num_calls_allowed; 
+    public final String ADD_BLOCK_EXCEPTION = "Testing exception thrown from"
+                                             + "TestDFSClientRetries::"
+                                             + "TestNameNode::addBlock";
+    public final String RETRY_CONFIG
+          = "dfs.client.block.write.locateFollowingBlock.retries";
+          
+    public TestNameNode(Configuration conf) throws IOException
+    {
+      // +1 because the configuration value is the number of retries and
+      // the first call is not a retry (e.g., 2 retries == 3 total
+      // calls allowed)
+      this.num_calls_allowed = conf.getInt(RETRY_CONFIG, 5) + 1;
+    }
+
+    public long getProtocolVersion(String protocol, 
+                                     long clientVersion)
+    throws IOException
+    {
+      return versionID;
+    }
+
+    public LocatedBlock addBlock(String src, String clientName)
+    throws IOException
+    {
+      num_calls++;
+      if (num_calls > num_calls_allowed) { 
+        throw new IOException("addBlock called more times than "
+                              + RETRY_CONFIG
+                              + " allows.");
+      } else {
+          throw new RemoteException(NotReplicatedYetException.class.getName(),
+                                    ADD_BLOCK_EXCEPTION);
+      }
+    }
+    
+    
+    // The following methods are stub methods that are not needed by this mock class
+    
+    public LocatedBlocks  getBlockLocations(String src, long offset, long length) throws IOException { return null; }
+    
+    public void create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, short replication, long blockSize) throws IOException {}
+    
+    public LocatedBlock append(String src, String clientName) throws IOException { return null; }
+
+    public boolean setReplication(String src, short replication) throws IOException { return false; }
+
+    public void setPermission(String src, FsPermission permission) throws IOException {}
+
+    public void setOwner(String src, String username, String groupname) throws IOException {}
+
+    public void abandonBlock(Block b, String src, String holder) throws IOException {}
+
+    public boolean complete(String src, String clientName) throws IOException { return false; }
+
+    public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
+
+    public boolean rename(String src, String dst) throws IOException { return false; }
+
+    public boolean delete(String src) throws IOException { return false; }
+
+    public boolean delete(String src, boolean recursive) throws IOException { return false; }
+
+    public boolean mkdirs(String src, FsPermission masked) throws IOException { return false; }
+
+    public FileStatus[] getListing(String src) throws IOException { return null; }
+
+    public void renewLease(String clientName) throws IOException {}
+
+    public long[] getStats() throws IOException { return null; }
+
+    public DatanodeInfo[] getDatanodeReport(FSConstants.DatanodeReportType type) throws IOException { return null; }
+
+    public long getPreferredBlockSize(String filename) throws IOException { return 0; }
+
+    public boolean setSafeMode(FSConstants.SafeModeAction action) throws IOException { return false; }
+
+    public void saveNamespace() throws IOException {}
+
+    public boolean restoreFailedStorage(String arg) throws AccessControlException { return false; }
+
+    public void refreshNodes() throws IOException {}
+
+    public void finalizeUpgrade() throws IOException {}
+
+    public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException { return null; }
+
+    public void metaSave(String filename) throws IOException {}
+
+    public FileStatus getFileInfo(String src) throws IOException { return null; }
+
+    public ContentSummary getContentSummary(String path) throws IOException { return null; }
+
+    public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException {}
+
+    public void fsync(String src, String client) throws IOException {}
+
+    public void setTimes(String src, long mtime, long atime) throws IOException {}
+
+  }
+  
+  public void testNotYetReplicatedErrors() throws IOException
+  {   
+    Configuration conf = new Configuration();
+    
+    // allow 1 retry (2 total calls)
+    conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
+        
+    TestNameNode tnn = new TestNameNode(conf);
+    DFSClient client = new DFSClient(tnn, tnn, conf, null);
+    OutputStream os = client.create("testfile", true);
+    os.write(20); // write one random byte
+    
+    try {
+      os.close();
+    } catch (Exception e) {
+      assertTrue("Retries are not being stopped correctly",
+           e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
+    }
+  }
+  
 }