You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/04/23 18:35:50 UTC
[2/2] hadoop git commit: HADOOP-10597. RPC Server signals backoff to
clients when all request queues are full. (Contributed by Ming Ma)
HADOOP-10597. RPC Server signals backoff to clients when all request queues are full. (Contributed by Ming Ma)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/edbeefdb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/edbeefdb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/edbeefdb
Branch: refs/heads/branch-2
Commit: edbeefdb052bffd0a3ba2f00c8a5a376017fbf33
Parents: 1cd2fcf
Author: Arpit Agarwal <ar...@apache.org>
Authored: Thu Apr 23 09:35:04 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Apr 23 09:35:15 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../hadoop/fs/CommonConfigurationKeys.java | 2 +
.../org/apache/hadoop/ipc/CallQueueManager.java | 20 ++++++-
.../main/java/org/apache/hadoop/ipc/Server.java | 36 +++++++++++-
.../apache/hadoop/ipc/metrics/RpcMetrics.java | 10 ++++
.../apache/hadoop/ipc/TestCallQueueManager.java | 6 +-
.../java/org/apache/hadoop/ipc/TestRPC.java | 58 ++++++++++++++++++++
7 files changed, 128 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/edbeefdb/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 307379e..a9a365d 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -58,6 +58,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-11827. Speed-up distcp buildListing() using threadpool
(Zoran Dimitrijevic via raviprak)
+ HADOOP-10597. RPC Server signals backoff to clients when all request
+ queues are full. (Ming Ma via Arpit Agarwal)
+
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
http://git-wip-us.apache.org/repos/asf/hadoop/blob/edbeefdb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 7575496..2721466 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -90,6 +90,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
+ public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
+ public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
/** This is for specifying the implementation for the mappings from
* hostnames to the racks they belong to
http://git-wip-us.apache.org/repos/asf/hadoop/blob/edbeefdb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 27949d0..1568bd6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -38,16 +38,19 @@ public class CallQueueManager<E> {
Class<?> queneClass, Class<E> elementClass) {
return (Class<? extends BlockingQueue<E>>)queneClass;
}
-
+ private final boolean clientBackOffEnabled;
+
// Atomic refs point to active callQueue
// We have two so we can better control swapping
private final AtomicReference<BlockingQueue<E>> putRef;
private final AtomicReference<BlockingQueue<E>> takeRef;
public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
- int maxQueueSize, String namespace, Configuration conf) {
+ boolean clientBackOffEnabled, int maxQueueSize, String namespace,
+ Configuration conf) {
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
maxQueueSize, namespace, conf);
+ this.clientBackOffEnabled = clientBackOffEnabled;
this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
LOG.info("Using callQueue " + backingClass);
@@ -89,6 +92,10 @@ public class CallQueueManager<E> {
" could not be constructed.");
}
+ boolean isClientBackoffEnabled() {
+ return clientBackOffEnabled;
+ }
+
/**
* Insert e into the backing queue or block until we can.
* If we block and the queue changes on us, we will insert while the
@@ -99,6 +106,15 @@ public class CallQueueManager<E> {
}
/**
+ * Insert e into the backing queue.
+ * Return true if e is queued.
+ * Return false if the queue is full.
+ */
+ public boolean offer(E e) throws InterruptedException {
+ return putRef.get().offer(e);
+ }
+
+ /**
* Retrieve an E from the backing queue or block until we can.
* Guaranteed to return an element from the current queue.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/edbeefdb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index a5a90d3..c80c022 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -500,6 +500,17 @@ public abstract class Server {
callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
}
+ /**
+ * Get from config if client backoff is enabled on that port.
+ */
+ static boolean getClientBackoffEnable(
+ String prefix, Configuration conf) {
+ String name = prefix + "." +
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
+ return conf.getBoolean(name,
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
+ }
+
/** A call queued for handling. */
public static class Call implements Schedulable {
private final int callId; // the client's call id
@@ -1877,10 +1888,31 @@ public abstract class Server {
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceSpan);
- callQueue.put(call); // queue the call; maybe blocked here
+ if (callQueue.isClientBackoffEnabled()) {
+ // if RPC queue is full, we will ask the RPC client to back off by
+ // throwing RetriableException. Whether RPC client will honor
+ // RetriableException and retry depends on client ipc retry policy.
+ // For example, FailoverOnNetworkExceptionRetry handles
+ // RetriableException.
+ queueRequestOrAskClientToBackOff(call);
+ } else {
+ callQueue.put(call); // queue the call; maybe blocked here
+ }
incRpcCount(); // Increment the rpc count
}
+ private void queueRequestOrAskClientToBackOff(Call call)
+ throws WrappedRpcServerException, InterruptedException {
+ // If rpc queue is full, we will ask the client to back off.
+ boolean isCallQueued = callQueue.offer(call);
+ if (!isCallQueued) {
+ rpcMetrics.incrClientBackoff();
+ RetriableException retriableException =
+ new RetriableException("Server is too busy.");
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
+ }
+ }
/**
* Establish RPC connection setup by negotiating SASL if required, then
@@ -2207,7 +2239,7 @@ public abstract class Server {
// Setup appropriate callqueue
final String prefix = getQueueClassPrefix();
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
- maxQueueSize, prefix, conf);
+ getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/edbeefdb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
index 5eba44a..e90e516 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
@@ -95,6 +95,8 @@ public class RpcMetrics {
MutableCounterLong rpcAuthorizationFailures;
@Metric("Number of authorization sucesses")
MutableCounterLong rpcAuthorizationSuccesses;
+ @Metric("Number of client backoff requests")
+ MutableCounterLong rpcClientBackoff;
@Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections();
@@ -192,4 +194,12 @@ public class RpcMetrics {
}
}
}
+
+ /**
+ * One client backoff event
+ */
+ //@Override
+ public void incrClientBackoff() {
+ rpcClientBackoff.incr();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/edbeefdb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
index 1b618b1..6e1838e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
@@ -143,21 +143,21 @@ public class TestCallQueueManager {
@Test
public void testCallQueueCapacity() throws InterruptedException {
- manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
+ manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
}
@Test
public void testEmptyConsume() throws InterruptedException {
- manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
+ manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
assertCanTake(manager, 0, 1); // Fails since it's empty
}
@Test(timeout=60000)
public void testSwapUnderContention() throws InterruptedException {
- manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null);
+ manager = new CallQueueManager<FakeCall>(queueClass, false, 5000, "", null);
ArrayList<Putter> producers = new ArrayList<Putter>();
ArrayList<Taker> consumers = new ArrayList<Taker>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/edbeefdb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 8a4dcb6..1ec46f6 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1077,6 +1077,64 @@ public class TestRPC {
}
}
+ /**
+ * Test RPC backoff.
+ */
+ @Test (timeout=30000)
+ public void testClientBackOff() throws Exception {
+ boolean succeeded = false;
+ final int numClients = 2;
+ final List<Future<Void>> res = new ArrayList<Future<Void>>();
+ final ExecutorService executorService =
+ Executors.newFixedThreadPool(numClients);
+ final Configuration conf = new Configuration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+ conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE +
+ ".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
+ final Server server = new RPC.Builder(conf)
+ .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+ .setBindAddress(ADDRESS).setPort(0)
+ .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
+ .build();
+ server.start();
+
+ final TestProtocol proxy =
+ RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
+ NetUtils.getConnectAddress(server), conf);
+ try {
+ // start a sleep RPC call to consume the only handler thread.
+ // Start another sleep RPC call to make callQueue full.
+ // Start another sleep RPC call to make reader thread block on CallQueue.
+ for (int i = 0; i < numClients; i++) {
+ res.add(executorService.submit(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws IOException, InterruptedException {
+ proxy.sleep(100000);
+ return null;
+ }
+ }));
+ }
+ while (server.getCallQueueLen() != 1
+ && countThreads(CallQueueManager.class.getName()) != 1) {
+ Thread.sleep(100);
+ }
+ try {
+ proxy.sleep(100);
+ } catch (RemoteException e) {
+ IOException unwrapExeption = e.unwrapRemoteException();
+ if (unwrapExeption instanceof RetriableException) {
+ succeeded = true;
+ }
+ }
+ } finally {
+ server.stop();
+ RPC.stopProxy(proxy);
+ executorService.shutdown();
+ }
+ assertTrue("RetriableException not received", succeeded);
+ }
+
public static void main(String[] args) throws IOException {
new TestRPC().testCallsInternal(conf);