You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/07 21:43:41 UTC

svn commit: r535963 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/io/retry/ src/test/org/apache/hadoop/io/retry/

Author: cutting
Date: Mon May  7 12:43:37 2007
New Revision: 535963

URL: http://svn.apache.org/viewvc?view=rev&rev=535963
Log:
HADOOP-1263.  Change DFSClient to retry certain namenode calls with an exponential backoff.  Contributed by Hairong.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=535963&r1=535962&r2=535963
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May  7 12:43:37 2007
@@ -332,6 +332,10 @@
 98. HADOOP-1184.  Fix HDFS decomissioning to complete when the only
     copy of a block is on a decommissioned node. (Dhruba Borthakur via cutting)
 
+99. HADOOP-1263.  Change DFSClient to retry certain namenode calls
+    with a random, exponentially increasing backoff time, to avoid
+    overloading the namenode on, e.g., job start.  (Hairong Kuang via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=535963&r1=535962&r2=535963
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon May  7 12:43:37 2007
@@ -18,6 +18,9 @@
 package org.apache.hadoop.dfs;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
@@ -28,6 +31,7 @@
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -94,6 +98,46 @@
     Runtime.getRuntime().addShutdownHook(clientFinalizer);
   }
 
+  private static ClientProtocol createNamenode(
+      InetSocketAddress nameNodeAddr, Configuration conf)
+    throws IOException {
+    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
+        5, 200, TimeUnit.MILLISECONDS);
+    RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+        5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
+    
+    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+      new HashMap<Class<? extends Exception>, RetryPolicy>();
+    exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
+    exceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
+
+    RetryPolicy methodPolicy = RetryPolicies.retryByException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
+    
+    methodNameToPolicyMap.put("open", methodPolicy);
+    methodNameToPolicyMap.put("setReplication", methodPolicy);
+    methodNameToPolicyMap.put("abandonBlock", methodPolicy);
+    methodNameToPolicyMap.put("abandonFileInProgress", methodPolicy);
+    methodNameToPolicyMap.put("reportBadBlocks", methodPolicy);
+    methodNameToPolicyMap.put("exists", methodPolicy);
+    methodNameToPolicyMap.put("isDir", methodPolicy);
+    methodNameToPolicyMap.put("getListing", methodPolicy);
+    methodNameToPolicyMap.put("getHints", methodPolicy);
+    methodNameToPolicyMap.put("renewLease", methodPolicy);
+    methodNameToPolicyMap.put("getStats", methodPolicy);
+    methodNameToPolicyMap.put("getDatanodeReport", methodPolicy);
+    methodNameToPolicyMap.put("getBlockSize", methodPolicy);
+    methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
+    methodNameToPolicyMap.put("complete", methodPolicy);
+    methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
+    methodNameToPolicyMap.put("create", methodPolicy);
+
+    return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
+        RPC.getProxy(ClientProtocol.class,
+            ClientProtocol.versionID, nameNodeAddr, conf),
+        methodNameToPolicyMap);
+  }
         
   /** 
    * Create a new DFSClient connected to the given namenode server.
@@ -101,8 +145,7 @@
   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
     throws IOException {
     this.conf = conf;
-    this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
-                                                  ClientProtocol.versionID, nameNodeAddr, conf);
+    this.namenode = createNamenode(nameNodeAddr, conf);
     String taskId = conf.get("mapred.task.id");
     if (taskId != null) {
       this.clientName = "DFSClient_" + taskId; 
@@ -160,19 +203,12 @@
   }
     
   public long getBlockSize(UTF8 f) throws IOException {
-    int retries = 4;
-    while (true) {
-      try {
-        return namenode.getBlockSize(f.toString());
-      } catch (IOException ie) {
-        if (--retries == 0) {
-          LOG.warn("Problem getting block size: " + 
-                   StringUtils.stringifyException(ie));
-          throw ie;
-        }
-        LOG.debug("Problem getting block size: " + 
-                  StringUtils.stringifyException(ie));
-      }
+    try {
+      return namenode.getBlockSize(f.toString());
+    } catch (IOException ie) {
+      LOG.warn("Problem getting block size: " + 
+          StringUtils.stringifyException(ie));
+      throw ie;
     }
   }
 
@@ -1133,31 +1169,8 @@
     }
 
     private LocatedBlock locateNewBlock() throws IOException {     
-      int retries = 3;
-      while (true) {
-        while (true) {
-          try {
-            return namenode.create(src.toString(), clientName.toString(),
-                                   overwrite, replication, blockSize);
-          } catch (RemoteException e) {
-            if (--retries == 0 || 
-                !AlreadyBeingCreatedException.class.getName().
-                equals(e.getClassName())) {
-              throw e;
-            } else {
-              // because failed tasks take upto LEASE_PERIOD to
-              // release their pendingCreates files, if the file
-              // we want to create is already being created, 
-              // wait and try again.
-              LOG.info(StringUtils.stringifyException(e));
-              try {
-                Thread.sleep(LEASE_SOFTLIMIT_PERIOD);
-              } catch (InterruptedException ie) {
-              }
-            }
-          }
-        }
-      }
+      return namenode.create(src.toString(), clientName.toString(),
+          overwrite, replication, blockSize);
     }
         
     private LocatedBlock locateFollowingBlock(long start

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?view=diff&rev=535963&r1=535962&r2=535963
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Mon May  7 12:43:37 2007
@@ -67,7 +67,7 @@
           }
           return null;
         }
-        LOG.warn("Exception while invoking " + method.getName()
+        LOG.info("Exception while invoking " + method.getName()
                  + " of " + implementation.getClass() + ". Retrying."
                  + StringUtils.stringifyException(e));
       }
@@ -76,6 +76,9 @@
 
   private Object invokeMethod(Method method, Object[] args) throws Throwable {
     try {
+      if (!method.isAccessible()) {
+        method.setAccessible(true);
+      }
       return method.invoke(implementation, args);
     } catch (InvocationTargetException e) {
       throw e.getCause();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java?view=diff&rev=535963&r1=535962&r2=535963
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java Mon May  7 12:43:37 2007
@@ -18,6 +18,7 @@
 package org.apache.hadoop.io.retry;
 
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -82,6 +83,10 @@
     return new RetryUpToMaximumCountWithProportionalSleep(maxRetries, sleepTime, timeUnit);
   }
   
+  public static final RetryPolicy exponentialBackoffRetry(
+      int maxRetries, long sleepTime, TimeUnit timeUnit) {
+    return new ExponentialBackoffRetry(maxRetries, sleepTime, timeUnit);
+  }
   /**
    * <p>
    * Set a default policy with some explicit handlers for specific exceptions.
@@ -121,7 +126,7 @@
     }
 
     public boolean shouldRetry(Exception e, int retries) throws Exception {
-      if (retries > maxRetries) {
+      if (retries >= maxRetries) {
         throw e;
       }
       try {
@@ -184,5 +189,16 @@
     
   }
   
-  
+  static class ExponentialBackoffRetry extends RetryLimited {
+    private Random r = new Random();
+    public ExponentialBackoffRetry(
+        int maxRetries, long sleepTime, TimeUnit timeUnit) {
+      super(maxRetries, sleepTime, timeUnit);
+    }
+    
+    @Override
+    protected long calculateSleepTime(int retries) {
+      return sleepTime*r.nextInt(1<<(retries+1));
+    }
+  }
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java?view=diff&rev=535963&r1=535962&r2=535963
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java Mon May  7 12:43:37 2007
@@ -7,6 +7,7 @@
 import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
 import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
 import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
+import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
 
 import java.util.Collections;
 import java.util.Map;
@@ -91,6 +92,20 @@
     UnreliableInterface unreliable = (UnreliableInterface)
       RetryProxy.create(UnreliableInterface.class, unreliableImpl,
                         retryUpToMaximumCountWithProportionalSleep(8, 1, TimeUnit.NANOSECONDS));
+    unreliable.alwaysSucceeds();
+    unreliable.failsOnceThenSucceeds();
+    try {
+      unreliable.failsTenTimesThenSucceeds();
+      fail("Should fail");
+    } catch (UnreliableException e) {
+      // expected
+    }
+  }
+  
+  public void testExponentialRetry() throws UnreliableException {
+    UnreliableInterface unreliable = (UnreliableInterface)
+      RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+                        exponentialBackoffRetry(5, 1L, TimeUnit.NANOSECONDS));
     unreliable.alwaysSucceeds();
     unreliable.failsOnceThenSucceeds();
     try {