You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/07 21:43:41 UTC
svn commit: r535963 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/io/retry/
src/test/org/apache/hadoop/io/retry/
Author: cutting
Date: Mon May 7 12:43:37 2007
New Revision: 535963
URL: http://svn.apache.org/viewvc?view=rev&rev=535963
Log:
HADOOP-1263. Change DFSClient to retry certain namenode calls with an exponential backoff. Contributed by Hairong.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=535963&r1=535962&r2=535963
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May 7 12:43:37 2007
@@ -332,6 +332,10 @@
98. HADOOP-1184. Fix HDFS decomissioning to complete when the only
copy of a block is on a decommissioned node. (Dhruba Borthakur via cutting)
+99. HADOOP-1263. Change DFSClient to retry certain namenode calls
+ with a random, exponentially increasing backoff time, to avoid
+ overloading the namenode on, e.g., job start. (Hairong Kuang via cutting)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=535963&r1=535962&r2=535963
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon May 7 12:43:37 2007
@@ -18,6 +18,9 @@
package org.apache.hadoop.dfs;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.ipc.*;
import org.apache.hadoop.conf.*;
@@ -28,6 +31,7 @@
import java.io.*;
import java.net.*;
import java.util.*;
+import java.util.concurrent.TimeUnit;
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
@@ -94,6 +98,46 @@
Runtime.getRuntime().addShutdownHook(clientFinalizer);
}
+ private static ClientProtocol createNamenode(
+ InetSocketAddress nameNodeAddr, Configuration conf)
+ throws IOException {
+ RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
+ 5, 200, TimeUnit.MILLISECONDS);
+ RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ 5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
+
+ Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
+ exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
+ exceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
+
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+ Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
+
+ methodNameToPolicyMap.put("open", methodPolicy);
+ methodNameToPolicyMap.put("setReplication", methodPolicy);
+ methodNameToPolicyMap.put("abandonBlock", methodPolicy);
+ methodNameToPolicyMap.put("abandonFileInProgress", methodPolicy);
+ methodNameToPolicyMap.put("reportBadBlocks", methodPolicy);
+ methodNameToPolicyMap.put("exists", methodPolicy);
+ methodNameToPolicyMap.put("isDir", methodPolicy);
+ methodNameToPolicyMap.put("getListing", methodPolicy);
+ methodNameToPolicyMap.put("getHints", methodPolicy);
+ methodNameToPolicyMap.put("renewLease", methodPolicy);
+ methodNameToPolicyMap.put("getStats", methodPolicy);
+ methodNameToPolicyMap.put("getDatanodeReport", methodPolicy);
+ methodNameToPolicyMap.put("getBlockSize", methodPolicy);
+ methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
+ methodNameToPolicyMap.put("complete", methodPolicy);
+ methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
+ methodNameToPolicyMap.put("create", methodPolicy);
+
+ return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
+ RPC.getProxy(ClientProtocol.class,
+ ClientProtocol.versionID, nameNodeAddr, conf),
+ methodNameToPolicyMap);
+ }
/**
* Create a new DFSClient connected to the given namenode server.
@@ -101,8 +145,7 @@
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
throws IOException {
this.conf = conf;
- this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
- ClientProtocol.versionID, nameNodeAddr, conf);
+ this.namenode = createNamenode(nameNodeAddr, conf);
String taskId = conf.get("mapred.task.id");
if (taskId != null) {
this.clientName = "DFSClient_" + taskId;
@@ -160,19 +203,12 @@
}
public long getBlockSize(UTF8 f) throws IOException {
- int retries = 4;
- while (true) {
- try {
- return namenode.getBlockSize(f.toString());
- } catch (IOException ie) {
- if (--retries == 0) {
- LOG.warn("Problem getting block size: " +
- StringUtils.stringifyException(ie));
- throw ie;
- }
- LOG.debug("Problem getting block size: " +
- StringUtils.stringifyException(ie));
- }
+ try {
+ return namenode.getBlockSize(f.toString());
+ } catch (IOException ie) {
+ LOG.warn("Problem getting block size: " +
+ StringUtils.stringifyException(ie));
+ throw ie;
}
}
@@ -1133,31 +1169,8 @@
}
private LocatedBlock locateNewBlock() throws IOException {
- int retries = 3;
- while (true) {
- while (true) {
- try {
- return namenode.create(src.toString(), clientName.toString(),
- overwrite, replication, blockSize);
- } catch (RemoteException e) {
- if (--retries == 0 ||
- !AlreadyBeingCreatedException.class.getName().
- equals(e.getClassName())) {
- throw e;
- } else {
- // because failed tasks take upto LEASE_PERIOD to
- // release their pendingCreates files, if the file
- // we want to create is already being created,
- // wait and try again.
- LOG.info(StringUtils.stringifyException(e));
- try {
- Thread.sleep(LEASE_SOFTLIMIT_PERIOD);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
- }
+ return namenode.create(src.toString(), clientName.toString(),
+ overwrite, replication, blockSize);
}
private LocatedBlock locateFollowingBlock(long start
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?view=diff&rev=535963&r1=535962&r2=535963
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Mon May 7 12:43:37 2007
@@ -67,7 +67,7 @@
}
return null;
}
- LOG.warn("Exception while invoking " + method.getName()
+ LOG.info("Exception while invoking " + method.getName()
+ " of " + implementation.getClass() + ". Retrying."
+ StringUtils.stringifyException(e));
}
@@ -76,6 +76,9 @@
private Object invokeMethod(Method method, Object[] args) throws Throwable {
try {
+ if (!method.isAccessible()) {
+ method.setAccessible(true);
+ }
return method.invoke(implementation, args);
} catch (InvocationTargetException e) {
throw e.getCause();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java?view=diff&rev=535963&r1=535962&r2=535963
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/retry/RetryPolicies.java Mon May 7 12:43:37 2007
@@ -18,6 +18,7 @@
package org.apache.hadoop.io.retry;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
@@ -82,6 +83,10 @@
return new RetryUpToMaximumCountWithProportionalSleep(maxRetries, sleepTime, timeUnit);
}
+ public static final RetryPolicy exponentialBackoffRetry(
+ int maxRetries, long sleepTime, TimeUnit timeUnit) {
+ return new ExponentialBackoffRetry(maxRetries, sleepTime, timeUnit);
+ }
/**
* <p>
* Set a default policy with some explicit handlers for specific exceptions.
@@ -121,7 +126,7 @@
}
public boolean shouldRetry(Exception e, int retries) throws Exception {
- if (retries > maxRetries) {
+ if (retries >= maxRetries) {
throw e;
}
try {
@@ -184,5 +189,16 @@
}
-
+ static class ExponentialBackoffRetry extends RetryLimited {
+ private Random r = new Random();
+ public ExponentialBackoffRetry(
+ int maxRetries, long sleepTime, TimeUnit timeUnit) {
+ super(maxRetries, sleepTime, timeUnit);
+ }
+
+ @Override
+ protected long calculateSleepTime(int retries) {
+ return sleepTime*r.nextInt(1<<(retries+1));
+ }
+ }
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java?view=diff&rev=535963&r1=535962&r2=535963
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/retry/TestRetryProxy.java Mon May 7 12:43:37 2007
@@ -7,6 +7,7 @@
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
+import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
import java.util.Collections;
import java.util.Map;
@@ -91,6 +92,20 @@
UnreliableInterface unreliable = (UnreliableInterface)
RetryProxy.create(UnreliableInterface.class, unreliableImpl,
retryUpToMaximumCountWithProportionalSleep(8, 1, TimeUnit.NANOSECONDS));
+ unreliable.alwaysSucceeds();
+ unreliable.failsOnceThenSucceeds();
+ try {
+ unreliable.failsTenTimesThenSucceeds();
+ fail("Should fail");
+ } catch (UnreliableException e) {
+ // expected
+ }
+ }
+
+ public void testExponentialRetry() throws UnreliableException {
+ UnreliableInterface unreliable = (UnreliableInterface)
+ RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+ exponentialBackoffRetry(5, 1L, TimeUnit.NANOSECONDS));
unreliable.alwaysSucceeds();
unreliable.failsOnceThenSucceeds();
try {