You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2016/03/31 17:48:12 UTC
[2/2] hadoop git commit: HADOOP-12916. Allow RPC scheduler/callqueue
backoff using response times. Contributed by Xiaoyu Yao.
HADOOP-12916. Allow RPC scheduler/callqueue backoff using response times. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d95c6eb3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d95c6eb3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d95c6eb3
Branch: refs/heads/trunk
Commit: d95c6eb32cec7768ac418fb467b1198ccf3cf0dc
Parents: 0a74610
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Thu Mar 31 08:42:57 2016 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu Mar 31 08:42:57 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/conf/Configuration.java | 13 +
.../hadoop/fs/CommonConfigurationKeys.java | 14 +-
.../org/apache/hadoop/ipc/CallQueueManager.java | 124 +++++-
.../apache/hadoop/ipc/DecayRpcScheduler.java | 396 +++++++++++++++----
.../hadoop/ipc/DecayRpcSchedulerMXBean.java | 2 +
.../apache/hadoop/ipc/DefaultRpcScheduler.java | 45 +++
.../org/apache/hadoop/ipc/FairCallQueue.java | 45 +--
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 8 +-
.../org/apache/hadoop/ipc/RpcScheduler.java | 8 +-
.../java/org/apache/hadoop/ipc/Schedulable.java | 5 +-
.../main/java/org/apache/hadoop/ipc/Server.java | 77 +++-
.../apache/hadoop/ipc/WritableRpcEngine.java | 45 +--
.../apache/hadoop/ipc/TestCallQueueManager.java | 147 ++++++-
.../hadoop/ipc/TestDecayRpcScheduler.java | 42 +-
.../apache/hadoop/ipc/TestFairCallQueue.java | 79 ++--
.../hadoop/ipc/TestIdentityProviders.java | 18 +-
.../java/org/apache/hadoop/ipc/TestRPC.java | 92 ++++-
17 files changed, 893 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
index 8355d96..4c8f27b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
@@ -1626,6 +1626,10 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
return defaultValue;
}
vStr = vStr.trim();
+ return getTimeDurationHelper(name, vStr, unit);
+ }
+
+ private long getTimeDurationHelper(String name, String vStr, TimeUnit unit) {
ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr);
if (null == vUnit) {
LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit);
@@ -1636,6 +1640,15 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
return unit.convert(Long.parseLong(vStr), vUnit.unit());
}
+ public long[] getTimeDurations(String name, TimeUnit unit) {
+ String[] strings = getTrimmedStrings(name);
+ long[] durations = new long[strings.length];
+ for (int i = 0; i < strings.length; i++) {
+ durations[i] = getTimeDurationHelper(name, strings[i], unit);
+ }
+ return durations;
+ }
+
/**
* Get the value of the <code>name</code> property as a <code>Pattern</code>.
* If no such property is specified, or if the specified value is not a valid
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 9b4069a..a708900 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -90,14 +90,22 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/**
* CallQueue related settings. These are not used directly, but rather
* combined with a namespace and port. For instance:
- * IPC_CALLQUEUE_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY
+ * IPC_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY
*/
- public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
+ public static final String IPC_NAMESPACE = "ipc";
public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
- public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
+ public static final String IPC_SCHEDULER_IMPL_KEY = "scheduler.impl";
+ public static final String IPC_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
+ /**
+ * IPC scheduler priority levels.
+ */
+ public static final String IPC_SCHEDULER_PRIORITY_LEVELS_KEY =
+ "scheduler.priority.levels";
+ public static final int IPC_SCHEDULER_PRIORITY_LEVELS_DEFAULT_KEY = 4;
+
/** This is for specifying the implementation for the mappings from
* hostnames to the racks they belong to
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 2ee15d3..1a7782a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
/**
* Abstracts queue operations for different blocking queues.
@@ -43,6 +44,13 @@ public class CallQueueManager<E> {
Class<?> queueClass, Class<E> elementClass) {
return (Class<? extends BlockingQueue<E>>)queueClass;
}
+
+ @SuppressWarnings("unchecked")
+ static Class<? extends RpcScheduler> convertSchedulerClass(
+ Class<?> schedulerClass) {
+ return (Class<? extends RpcScheduler>)schedulerClass;
+ }
+
private final boolean clientBackOffEnabled;
// Atomic refs point to active callQueue
@@ -50,25 +58,76 @@ public class CallQueueManager<E> {
private final AtomicReference<BlockingQueue<E>> putRef;
private final AtomicReference<BlockingQueue<E>> takeRef;
+ private RpcScheduler scheduler;
+
public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
+ Class<? extends RpcScheduler> schedulerClass,
boolean clientBackOffEnabled, int maxQueueSize, String namespace,
Configuration conf) {
+ int priorityLevels = parseNumLevels(namespace, conf);
+ this.scheduler = createScheduler(schedulerClass, priorityLevels,
+ namespace, conf);
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
- maxQueueSize, namespace, conf);
+ priorityLevels, maxQueueSize, namespace, conf);
this.clientBackOffEnabled = clientBackOffEnabled;
this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
- LOG.info("Using callQueue " + backingClass);
+ LOG.info("Using callQueue: " + backingClass + " scheduler: " +
+ schedulerClass);
+ }
+
+ private static <T extends RpcScheduler> T createScheduler(
+ Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
+ // Used for custom, configurable scheduler
+ try {
+ Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
+ String.class, Configuration.class);
+ return ctor.newInstance(priorityLevels, ns, conf);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(theClass.getName()
+ + " could not be constructed.", e.getCause());
+ } catch (Exception e) {
+ }
+
+ try {
+ Constructor<T> ctor = theClass.getDeclaredConstructor(int.class);
+ return ctor.newInstance(priorityLevels);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(theClass.getName()
+ + " could not be constructed.", e.getCause());
+ } catch (Exception e) {
+ }
+
+ // Last attempt
+ try {
+ Constructor<T> ctor = theClass.getDeclaredConstructor();
+ return ctor.newInstance();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(theClass.getName()
+ + " could not be constructed.", e.getCause());
+ } catch (Exception e) {
+ }
+
+ // Nothing worked
+ throw new RuntimeException(theClass.getName() +
+ " could not be constructed.");
}
private <T extends BlockingQueue<E>> T createCallQueueInstance(
- Class<T> theClass, int maxLen, String ns, Configuration conf) {
+ Class<T> theClass, int priorityLevels, int maxLen, String ns,
+ Configuration conf) {
// Used for custom, configurable callqueues
try {
- Constructor<T> ctor = theClass.getDeclaredConstructor(int.class, String.class,
- Configuration.class);
- return ctor.newInstance(maxLen, ns, conf);
+ Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
+ int.class, String.class, Configuration.class);
+ return ctor.newInstance(priorityLevels, maxLen, ns, conf);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
@@ -110,6 +169,22 @@ public class CallQueueManager<E> {
return clientBackOffEnabled;
}
+ // Based on policy to determine back off current call
+ boolean shouldBackOff(Schedulable e) {
+ return scheduler.shouldBackOff(e);
+ }
+
+ void addResponseTime(String name, int priorityLevel, int queueTime,
+ int processingTime) {
+ scheduler.addResponseTime(name, priorityLevel, queueTime, processingTime);
+ }
+
+ // This should be only called once per call and cached in the call object
+ // each getPriorityLevel call will increment the counter for the caller
+ int getPriorityLevel(Schedulable e) {
+ return scheduler.getPriorityLevel(e);
+ }
+
/**
* Insert e into the backing queue or block until we can.
* If we block and the queue changes on us, we will insert while the
@@ -147,14 +222,45 @@ public class CallQueueManager<E> {
}
/**
+ * Read the number of levels from the configuration.
+ * This will affect the FairCallQueue's overall capacity.
+ * @throws IllegalArgumentException on invalid queue count
+ */
+ @SuppressWarnings("deprecation")
+ private static int parseNumLevels(String ns, Configuration conf) {
+ // Fair call queue levels (IPC_CALLQUEUE_PRIORITY_LEVELS_KEY)
+ // takes priority over the scheduler level key
+ // (IPC_SCHEDULER_PRIORITY_LEVELS_KEY)
+ int retval = conf.getInt(ns + "." +
+ FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 0);
+ if (retval == 0) { // No FCQ priority level configured
+ retval = conf.getInt(ns + "." +
+ CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY,
+ CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_DEFAULT_KEY);
+ } else {
+ LOG.warn(ns + "." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY +
+ " is deprecated. Please use " + ns + "." +
+ CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY + ".");
+ }
+ if(retval < 1) {
+ throw new IllegalArgumentException("numLevels must be at least 1");
+ }
+ return retval;
+ }
+
+ /**
* Replaces active queue with the newly requested one and transfers
* all calls to the newQ before returning.
*/
public synchronized void swapQueue(
+ Class<? extends RpcScheduler> schedulerClass,
Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize,
String ns, Configuration conf) {
- BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse, maxSize,
- ns, conf);
+ int priorityLevels = parseNumLevels(ns, conf);
+ RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
+ ns, conf);
+ BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
+ priorityLevels, maxSize, ns, conf);
// Our current queue becomes the old queue
BlockingQueue<E> oldQ = putRef.get();
@@ -168,6 +274,8 @@ public class CallQueueManager<E> {
// Swap takeRef to handle new calls
takeRef.set(newQ);
+ this.scheduler = newScheduler;
+
LOG.info("Old Queue: " + stringRepr(oldQ) + ", " +
"Replacement: " + stringRepr(newQ));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index a6a14d0..4237339 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -27,17 +27,21 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.util.concurrent.AtomicDoubleArray;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.util.MBeans;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The decay RPC scheduler counts incoming requests in a map, then
@@ -49,22 +53,28 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
/**
* Period controls how many milliseconds between each decay sweep.
*/
- public static final String IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY =
+ public static final String IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY =
+ "decay-scheduler.period-ms";
+ public static final long IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT =
+ 5000;
+ @Deprecated
+ public static final String IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY =
"faircallqueue.decay-scheduler.period-ms";
- public static final long IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT =
- 5000L;
/**
* Decay factor controls how much each count is suppressed by on each sweep.
* Valid numbers are > 0 and < 1. Decay factor works in tandem with period
* to control how long the scheduler remembers an identity.
*/
- public static final String IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY =
+ public static final String IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY =
+ "decay-scheduler.decay-factor";
+ public static final double IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT =
+ 0.5;
+ @Deprecated
+ public static final String IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY =
"faircallqueue.decay-scheduler.decay-factor";
- public static final double IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT =
- 0.5;
- /**
+ /**
* Thresholds are specified as integer percentages, and specify which usage
* range each queue will be allocated to. For instance, specifying the list
* 10, 40, 80
@@ -74,15 +84,31 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
* - q1 from 10 up to 40
* - q0 otherwise.
*/
- public static final String IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY =
- "faircallqueue.decay-scheduler.thresholds";
+ public static final String IPC_DECAYSCHEDULER_THRESHOLDS_KEY =
+ "decay-scheduler.thresholds";
+ @Deprecated
+ public static final String IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY =
+ "faircallqueue.decay-scheduler.thresholds";
// Specifies the identity to use when the IdentityProvider cannot handle
// a schedulable.
public static final String DECAYSCHEDULER_UNKNOWN_IDENTITY =
- "IdentityProvider.Unknown";
+ "IdentityProvider.Unknown";
+
+ public static final String
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY =
+ "decay-scheduler.backoff.responsetime.enable";
+ public static final Boolean
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_DEFAULT = false;
+
+ // Specifies the average response time (ms) thresholds of each
+ // level to trigger backoff
+ public static final String
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY =
+ "decay-scheduler.backoff.responsetime.thresholds";
- public static final Log LOG = LogFactory.getLog(DecayRpcScheduler.class);
+ public static final Logger LOG =
+ LoggerFactory.getLogger(DecayRpcScheduler.class);
// Track the number of calls for each schedulable identity
private final ConcurrentHashMap<Object, AtomicLong> callCounts =
@@ -91,6 +117,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
// Should be the sum of all AtomicLongs in callCounts
private final AtomicLong totalCalls = new AtomicLong();
+ // Track total call count and response time in current decay window
+ private final AtomicLongArray responseTimeCountInCurrWindow;
+ private final AtomicLongArray responseTimeTotalInCurrWindow;
+
+ // Track average response time in previous decay window
+ private final AtomicDoubleArray responseTimeAvgInLastWindow;
+ private final AtomicLongArray responseTimeCountInLastWindow;
+
// Pre-computed scheduling decisions during the decay sweep are
// atomically swapped in as a read-only map
private final AtomicReference<Map<Object, Integer>> scheduleCacheRef =
@@ -98,10 +132,12 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
// Tune the behavior of the scheduler
private final long decayPeriodMillis; // How long between each tick
- private final double decayFactor; // nextCount = currentCount / decayFactor
- private final int numQueues; // affects scheduling decisions, from 0 to numQueues - 1
+ private final double decayFactor; // nextCount = currentCount * decayFactor
+ private final int numLevels;
private final double[] thresholds;
private final IdentityProvider identityProvider;
+ private final boolean backOffByResponseTimeEnabled;
+ private final long[] backOffResponseTimeThresholds;
/**
* This TimerTask will call decayCurrentCounts until
@@ -132,35 +168,46 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
/**
* Create a decay scheduler.
- * @param numQueues number of queues to schedule for
+ * @param numLevels number of priority levels
* @param ns config prefix, so that we can configure multiple schedulers
* in a single instance.
* @param conf configuration to use.
*/
- public DecayRpcScheduler(int numQueues, String ns, Configuration conf) {
- if (numQueues < 1) {
- throw new IllegalArgumentException("number of queues must be > 0");
+ public DecayRpcScheduler(int numLevels, String ns, Configuration conf) {
+ if(numLevels < 1) {
+ throw new IllegalArgumentException("Number of Priority Levels must be " +
+ "at least 1");
}
-
- this.numQueues = numQueues;
+ this.numLevels = numLevels;
this.decayFactor = parseDecayFactor(ns, conf);
this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
this.identityProvider = this.parseIdentityProvider(ns, conf);
- this.thresholds = parseThresholds(ns, conf, numQueues);
+ this.thresholds = parseThresholds(ns, conf, numLevels);
+ this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns,
+ conf);
+ this.backOffResponseTimeThresholds =
+ parseBackOffResponseTimeThreshold(ns, conf, numLevels);
// Setup delay timer
Timer timer = new Timer();
DecayTask task = new DecayTask(this, timer);
timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis);
- MetricsProxy prox = MetricsProxy.getInstance(ns);
+ // Setup response time metrics
+ responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels);
+ responseTimeCountInCurrWindow = new AtomicLongArray(numLevels);
+ responseTimeAvgInLastWindow = new AtomicDoubleArray(numLevels);
+ responseTimeCountInLastWindow = new AtomicLongArray(numLevels);
+
+ MetricsProxy prox = MetricsProxy.getInstance(ns, numLevels);
prox.setDelegate(this);
}
// Load configs
- private IdentityProvider parseIdentityProvider(String ns, Configuration conf) {
+ private IdentityProvider parseIdentityProvider(String ns,
+ Configuration conf) {
List<IdentityProvider> providers = conf.getInstances(
- ns + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
+ ns + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
IdentityProvider.class);
if (providers.size() < 1) {
@@ -174,10 +221,16 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
private static double parseDecayFactor(String ns, Configuration conf) {
double factor = conf.getDouble(ns + "." +
- IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY,
- IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT
- );
-
+ IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, 0.0);
+ if (factor == 0.0) {
+ factor = conf.getDouble(ns + "." +
+ IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY,
+ IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT);
+ } else if ((factor > 0.0) && (factor < 1)) {
+ LOG.warn(IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY +
+ " is deprecated. Please use " +
+ IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY + ".");
+ }
if (factor <= 0 || factor >= 1) {
throw new IllegalArgumentException("Decay Factor " +
"must be between 0 and 1");
@@ -188,10 +241,17 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
private static long parseDecayPeriodMillis(String ns, Configuration conf) {
long period = conf.getLong(ns + "." +
- IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY,
- IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT
- );
-
+ IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
+ 0);
+ if (period == 0) {
+ period = conf.getLong(ns + "." +
+ IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY,
+ IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT);
+ } else if (period > 0) {
+ LOG.warn((IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY +
+ " is deprecated. Please use " +
+ IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY));
+ }
if (period <= 0) {
throw new IllegalArgumentException("Period millis must be >= 0");
}
@@ -200,15 +260,24 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
}
private static double[] parseThresholds(String ns, Configuration conf,
- int numQueues) {
+ int numLevels) {
int[] percentages = conf.getInts(ns + "." +
- IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY);
+ IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY);
if (percentages.length == 0) {
- return getDefaultThresholds(numQueues);
- } else if (percentages.length != numQueues-1) {
+ percentages = conf.getInts(ns + "." + IPC_DECAYSCHEDULER_THRESHOLDS_KEY);
+ if (percentages.length == 0) {
+ return getDefaultThresholds(numLevels);
+ }
+ } else {
+ LOG.warn(IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY +
+ " is deprecated. Please use " +
+ IPC_DECAYSCHEDULER_THRESHOLDS_KEY);
+ }
+
+ if (percentages.length != numLevels-1) {
throw new IllegalArgumentException("Number of thresholds should be " +
- (numQueues-1) + ". Was: " + percentages.length);
+ (numLevels-1) + ". Was: " + percentages.length);
}
// Convert integer percentages to decimals
@@ -223,14 +292,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
/**
* Generate default thresholds if user did not specify. Strategy is
* to halve each time, since queue usage tends to be exponential.
- * So if numQueues is 4, we would generate: double[]{0.125, 0.25, 0.5}
+ * So if numLevels is 4, we would generate: double[]{0.125, 0.25, 0.5}
* which specifies the boundaries between each queue's usage.
- * @param numQueues number of queues to compute for
- * @return array of boundaries of length numQueues - 1
+ * @param numLevels number of levels to compute for
+ * @return array of boundaries of length numLevels - 1
*/
- private static double[] getDefaultThresholds(int numQueues) {
- double[] ret = new double[numQueues - 1];
- double div = Math.pow(2, numQueues - 1);
+ private static double[] getDefaultThresholds(int numLevels) {
+ double[] ret = new double[numLevels - 1];
+ double div = Math.pow(2, numLevels - 1);
for (int i = 0; i < ret.length; i++) {
ret[i] = Math.pow(2, i)/div;
@@ -238,39 +307,89 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
return ret;
}
+ private static long[] parseBackOffResponseTimeThreshold(String ns,
+ Configuration conf, int numLevels) {
+ long[] responseTimeThresholds = conf.getTimeDurations(ns + "." +
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY,
+ TimeUnit.MILLISECONDS);
+ // backoff thresholds not specified
+ if (responseTimeThresholds.length == 0) {
+ return getDefaultBackOffResponseTimeThresholds(numLevels);
+ }
+ // backoff thresholds specified but not match with the levels
+ if (responseTimeThresholds.length != numLevels) {
+ throw new IllegalArgumentException(
+ "responseTimeThresholds must match with the number of priority " +
+ "levels");
+ }
+ // invalid thresholds
+ for (long responseTimeThreshold: responseTimeThresholds) {
+ if (responseTimeThreshold <= 0) {
+ throw new IllegalArgumentException(
+ "responseTimeThreshold millis must be >= 0");
+ }
+ }
+ return responseTimeThresholds;
+ }
+
+ // 10s for level 0, 20s for level 1, 30s for level 2, ...
+ private static long[] getDefaultBackOffResponseTimeThresholds(int numLevels) {
+ long[] ret = new long[numLevels];
+ for (int i = 0; i < ret.length; i++) {
+ ret[i] = 10000*(i+1);
+ }
+ return ret;
+ }
+
+ private static Boolean parseBackOffByResponseTimeEnabled(String ns,
+ Configuration conf) {
+ return conf.getBoolean(ns + "." +
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY,
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_DEFAULT);
+ }
+
/**
* Decay the stored counts for each user and clean as necessary.
* This method should be called periodically in order to keep
* counts current.
*/
private void decayCurrentCounts() {
- long total = 0;
- Iterator<Map.Entry<Object, AtomicLong>> it =
- callCounts.entrySet().iterator();
-
- while (it.hasNext()) {
- Map.Entry<Object, AtomicLong> entry = it.next();
- AtomicLong count = entry.getValue();
-
- // Compute the next value by reducing it by the decayFactor
- long currentValue = count.get();
- long nextValue = (long)(currentValue * decayFactor);
- total += nextValue;
- count.set(nextValue);
-
- if (nextValue == 0) {
- // We will clean up unused keys here. An interesting optimization might
- // be to have an upper bound on keyspace in callCounts and only
- // clean once we pass it.
- it.remove();
+ try {
+ long total = 0;
+ Iterator<Map.Entry<Object, AtomicLong>> it =
+ callCounts.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Object, AtomicLong> entry = it.next();
+ AtomicLong count = entry.getValue();
+
+ // Compute the next value by reducing it by the decayFactor
+ long currentValue = count.get();
+ long nextValue = (long) (currentValue * decayFactor);
+ total += nextValue;
+ count.set(nextValue);
+
+ if (nextValue == 0) {
+ // We will clean up unused keys here. An interesting optimization
+ // might be to have an upper bound on keyspace in callCounts and only
+ // clean once we pass it.
+ it.remove();
+ }
}
- }
- // Update the total so that we remain in sync
- totalCalls.set(total);
+ // Update the total so that we remain in sync
+ totalCalls.set(total);
+
+ // Now refresh the cache of scheduling decisions
+ recomputeScheduleCache();
- // Now refresh the cache of scheduling decisions
- recomputeScheduleCache();
+ // Update average response time with decay
+ updateAverageResponseTime(true);
+ } catch (Exception ex) {
+ LOG.error("decayCurrentCounts exception: " +
+ ExceptionUtils.getFullStackTrace(ex));
+ throw ex;
+ }
}
/**
@@ -324,7 +443,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
/**
* Given the number of occurrences, compute a scheduling decision.
* @param occurrences how many occurrences
- * @return scheduling decision from 0 to numQueues - 1
+ * @return scheduling decision from 0 to numLevels - 1
*/
private int computePriorityLevel(long occurrences) {
long totalCallSnapshot = totalCalls.get();
@@ -334,14 +453,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
proportion = (double) occurrences / totalCallSnapshot;
}
- // Start with low priority queues, since they will be most common
- for(int i = (numQueues - 1); i > 0; i--) {
+ // Start with low priority levels, since they will be most common
+ for(int i = (numLevels - 1); i > 0; i--) {
if (proportion >= this.thresholds[i - 1]) {
- return i; // We've found our queue number
+ return i; // We've found our level number
}
}
- // If we get this far, we're at queue 0
+ // If we get this far, we're at level 0
return 0;
}
@@ -349,7 +468,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
* Returns the priority level for a given identity by first trying the cache,
* then computing it.
* @param identity an object responding to toString and hashCode
- * @return integer scheduling decision from 0 to numQueues - 1
+ * @return integer scheduling decision from 0 to numLevels - 1
*/
private int cachedOrComputedPriorityLevel(Object identity) {
try {
@@ -360,22 +479,29 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
if (scheduleCache != null) {
Integer priority = scheduleCache.get(identity);
if (priority != null) {
+ LOG.debug("Cache priority for: {} with priority: {}", identity,
+ priority);
return priority;
}
}
// Cache was no good, compute it
- return computePriorityLevel(occurrences);
+ int priority = computePriorityLevel(occurrences);
+ LOG.debug("compute priority for " + identity + " priority " + priority);
+ return priority;
+
} catch (InterruptedException ie) {
- LOG.warn("Caught InterruptedException, returning low priority queue");
- return numQueues - 1;
+ LOG.warn("Caught InterruptedException, returning low priority level");
+ LOG.debug("Fallback priority for: {} with priority: {}", identity,
+ numLevels - 1);
+ return numLevels - 1;
}
}
/**
* Compute the appropriate priority for a schedulable based on past requests.
* @param obj the schedulable obj to query and remember
- * @return the queue index which we recommend scheduling in
+ * @return the level index which we recommend scheduling in
*/
@Override
public int getPriorityLevel(Schedulable obj) {
@@ -389,6 +515,73 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
return cachedOrComputedPriorityLevel(identity);
}
+ @Override
+ public boolean shouldBackOff(Schedulable obj) {
+ Boolean backOff = false;
+ if (backOffByResponseTimeEnabled) {
+ int priorityLevel = obj.getPriorityLevel();
+ if (LOG.isDebugEnabled()) {
+ double[] responseTimes = getAverageResponseTime();
+ LOG.debug("Current Caller: {} Priority: {} ",
+ obj.getUserGroupInformation().getUserName(),
+ obj.getPriorityLevel());
+ for (int i = 0; i < numLevels; i++) {
+ LOG.debug("Queue: {} responseTime: {} backoffThreshold: {}", i,
+ responseTimes[i], backOffResponseTimeThresholds[i]);
+ }
+ }
+ // High priority rpc over threshold triggers back off of low priority rpc
+ for (int i = 0; i < priorityLevel + 1; i++) {
+ if (responseTimeAvgInLastWindow.get(i) >
+ backOffResponseTimeThresholds[i]) {
+ backOff = true;
+ break;
+ }
+ }
+ }
+ return backOff;
+ }
+
+ @Override
+ public void addResponseTime(String name, int priorityLevel, int queueTime,
+ int processingTime) {
+ responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
+ responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,
+ queueTime+processingTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addResponseTime for call: {} priority: {} queueTime: {} " +
+ "processingTime: {} ", name, priorityLevel, queueTime,
+ processingTime);
+ }
+ }
+
+ // Update the cached average response time at the end of decay window
+ void updateAverageResponseTime(boolean enableDecay) {
+ for (int i = 0; i < numLevels; i++) {
+ double averageResponseTime = 0;
+ long totalResponseTime = responseTimeTotalInCurrWindow.get(i);
+ long responseTimeCount = responseTimeCountInCurrWindow.get(i);
+ if (responseTimeCount > 0) {
+ averageResponseTime = (double) totalResponseTime / responseTimeCount;
+ }
+ final double lastAvg = responseTimeAvgInLastWindow.get(i);
+ if (enableDecay && lastAvg > 0.0) {
+ final double decayed = decayFactor * lastAvg + averageResponseTime;
+ responseTimeAvgInLastWindow.set(i, decayed);
+ } else {
+ responseTimeAvgInLastWindow.set(i, averageResponseTime);
+ }
+ responseTimeCountInLastWindow.set(i, responseTimeCount);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updateAverageResponseTime queue: {} Average: {} Count: {}",
+ i, averageResponseTime, responseTimeCount);
+ }
+ // Reset for next decay window
+ responseTimeTotalInCurrWindow.set(i, 0);
+ responseTimeCountInCurrWindow.set(i, 0);
+ }
+ }
+
// For testing
@VisibleForTesting
public double getDecayFactor() { return decayFactor; }
@@ -429,16 +622,21 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
// Weakref for delegate, so we don't retain it forever if it can be GC'd
private WeakReference<DecayRpcScheduler> delegate;
+ private double[] averageResponseTimeDefault;
+ private long[] callCountInLastWindowDefault;
- private MetricsProxy(String namespace) {
+ private MetricsProxy(String namespace, int numLevels) {
+ averageResponseTimeDefault = new double[numLevels];
+ callCountInLastWindowDefault = new long[numLevels];
MBeans.register(namespace, "DecayRpcScheduler", this);
}
- public static synchronized MetricsProxy getInstance(String namespace) {
+ public static synchronized MetricsProxy getInstance(String namespace,
+ int numLevels) {
MetricsProxy mp = INSTANCES.get(namespace);
if (mp == null) {
// We must create one
- mp = new MetricsProxy(namespace);
+ mp = new MetricsProxy(namespace, numLevels);
INSTANCES.put(namespace, mp);
}
return mp;
@@ -487,6 +685,25 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
return scheduler.getTotalCallVolume();
}
}
+
+ @Override
+ public double[] getAverageResponseTime() {
+ DecayRpcScheduler scheduler = delegate.get();
+ if (scheduler == null) {
+ return averageResponseTimeDefault;
+ } else {
+ return scheduler.getAverageResponseTime();
+ }
+ }
+
+ public long[] getResponseTimeCountInLastWindow() {
+ DecayRpcScheduler scheduler = delegate.get();
+ if (scheduler == null) {
+ return callCountInLastWindowDefault;
+ } else {
+ return scheduler.getResponseTimeCountInLastWindow();
+ }
+ }
}
public int getUniqueIdentityCount() {
@@ -497,6 +714,23 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
return totalCalls.get();
}
+ public long[] getResponseTimeCountInLastWindow() {
+ long[] ret = new long[responseTimeCountInLastWindow.length()];
+ for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) {
+ ret[i] = responseTimeCountInLastWindow.get(i);
+ }
+ return ret;
+ }
+
+ @Override
+ public double[] getAverageResponseTime() {
+ double[] ret = new double[responseTimeAvgInLastWindow.length()];
+ for (int i = 0; i < responseTimeAvgInLastWindow.length(); i++) {
+ ret[i] = responseTimeAvgInLastWindow.get(i);
+ }
+ return ret;
+ }
+
public String getSchedulingDecisionSummary() {
Map<Object, Integer> decisions = scheduleCacheRef.get();
if (decisions == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java
index 3481f19..fab9b93 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java
@@ -27,4 +27,6 @@ public interface DecayRpcSchedulerMXBean {
String getCallVolumeSummary();
int getUniqueIdentityCount();
long getTotalCallVolume();
+ double[] getAverageResponseTime();
+ long[] getResponseTimeCountInLastWindow();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
new file mode 100644
index 0000000..08f74d4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * No op default RPC scheduler.
+ */
+public class DefaultRpcScheduler implements RpcScheduler {
+ @Override
+ public int getPriorityLevel(Schedulable obj) {
+ return 0;
+ }
+
+ @Override
+ public boolean shouldBackOff(Schedulable obj) {
+ return false;
+ }
+
+ @Override
+ public void addResponseTime(String name, int priorityLevel, int queueTime,
+ int processingTime) {
+ }
+
+ public DefaultRpcScheduler(int priorityLevels, String namespace,
+ Configuration conf) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
index 0b56243..435c454 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
@@ -44,8 +44,9 @@ import org.apache.hadoop.metrics2.util.MBeans;
*/
public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
implements BlockingQueue<E> {
- // Configuration Keys
+ @Deprecated
public static final int IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4;
+ @Deprecated
public static final String IPC_CALLQUEUE_PRIORITY_LEVELS_KEY =
"faircallqueue.priority-levels";
@@ -66,9 +67,6 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
}
}
- /* Scheduler picks which queue to place in */
- private RpcScheduler scheduler;
-
/* Multiplexer picks which queue to draw from */
private RpcMultiplexer multiplexer;
@@ -83,8 +81,13 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
* Notes: the FairCallQueue has no fixed capacity. Rather, it has a minimum
* capacity of `capacity` and a maximum capacity of `capacity * number_queues`
*/
- public FairCallQueue(int capacity, String ns, Configuration conf) {
- int numQueues = parseNumQueues(ns, conf);
+ public FairCallQueue(int priorityLevels, int capacity, String ns,
+ Configuration conf) {
+ if(priorityLevels < 1) {
+ throw new IllegalArgumentException("Number of Priority Levels must be " +
+ "at least 1");
+ }
+ int numQueues = priorityLevels;
LOG.info("FairCallQueue is in use with " + numQueues + " queues.");
this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
@@ -95,29 +98,13 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
this.overflowedCalls.add(new AtomicLong(0));
}
- this.scheduler = new DecayRpcScheduler(numQueues, ns, conf);
this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf);
-
// Make this the active source of metrics
MetricsProxy mp = MetricsProxy.getInstance(ns);
mp.setDelegate(this);
}
/**
- * Read the number of queues from the configuration.
- * This will affect the FairCallQueue's overall capacity.
- * @throws IllegalArgumentException on invalid queue count
- */
- private static int parseNumQueues(String ns, Configuration conf) {
- int retval = conf.getInt(ns + "." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY,
- IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT);
- if(retval < 1) {
- throw new IllegalArgumentException("numQueues must be at least 1");
- }
- return retval;
- }
-
- /**
* Returns the first non-empty queue with equal or lesser priority
* than <i>startIdx</i>. Wraps around, searching a maximum of N
* queues, where N is this.queues.size().
@@ -144,7 +131,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
/**
* Put and offer follow the same pattern:
- * 1. Get a priorityLevel from the scheduler
+ * 1. Get the assigned priorityLevel from the call by scheduler
* 2. Get the nth sub-queue matching this priorityLevel
* 3. delegate the call to this sub-queue.
*
@@ -154,7 +141,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
*/
@Override
public void put(E e) throws InterruptedException {
- int priorityLevel = scheduler.getPriorityLevel(e);
+ int priorityLevel = e.getPriorityLevel();
final int numLevels = this.queues.size();
while (true) {
@@ -185,7 +172,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
@Override
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
- int priorityLevel = scheduler.getPriorityLevel(e);
+ int priorityLevel = e.getPriorityLevel();
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean ret = q.offer(e, timeout, unit);
@@ -196,7 +183,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
@Override
public boolean offer(E e) {
- int priorityLevel = scheduler.getPriorityLevel(e);
+ int priorityLevel = e.getPriorityLevel();
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean ret = q.offer(e);
@@ -436,12 +423,6 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
return calls;
}
- // For testing
- @VisibleForTesting
- public void setScheduler(RpcScheduler newScheduler) {
- this.scheduler = newScheduler;
- }
-
@VisibleForTesting
public void setMultiplexer(RpcMultiplexer newMux) {
this.multiplexer = newMux;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 692d2b6..071e2e8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -654,13 +654,7 @@ public class ProtobufRpcEngine implements RpcEngine {
String detailedMetricsName = (exception == null) ?
methodName :
exception.getClass().getSimpleName();
- server.rpcMetrics.addRpcQueueTime(qTime);
- server.rpcMetrics.addRpcProcessingTime(processingTime);
- server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
- processingTime);
- if (server.isLogSlowRPC()) {
- server.logSlowRpcCalls(methodName, processingTime);
- }
+ server.updateMetrics(detailedMetricsName, qTime, processingTime);
}
return new RpcResponseWrapper(result);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
index a155706..6f93b22 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
@@ -19,11 +19,17 @@
package org.apache.hadoop.ipc;
/**
- * Implement this interface to be used for RPC scheduling in the fair call queues.
+ * Implement this interface to be used for RPC scheduling and backoff.
+ *
*/
public interface RpcScheduler {
/**
* Returns priority level greater than zero as a hint for scheduling.
*/
int getPriorityLevel(Schedulable obj);
+
+ boolean shouldBackOff(Schedulable obj);
+
+ void addResponseTime(String name, int priorityLevel, int queueTime,
+ int processingTime);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java
index 38f3518..3b28d85 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java
@@ -18,11 +18,8 @@
package org.apache.hadoop.ipc;
-import java.nio.ByteBuffer;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.io.Writable;
/**
* Interface which allows extracting information necessary to
@@ -31,4 +28,6 @@ import org.apache.hadoop.io.Writable;
@InterfaceAudience.Private
public interface Schedulable {
public UserGroupInformation getUserGroupInformation();
+
+ int getPriorityLevel();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 1d92865..eb28ad5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -396,6 +396,15 @@ public abstract class Server {
return CurCall.get() != null;
}
+ /**
+ * Return the priority level assigned by call queue to an RPC
+ * Returns 0 in case no priority is assigned.
+ */
+ public static int getPriorityLevel() {
+ Call call = CurCall.get();
+ return call != null? call.getPriorityLevel() : 0;
+ }
+
private String bindAddress;
private int port; // port we listen on
private int handlerCount; // number of handler threads
@@ -482,6 +491,18 @@ public abstract class Server {
}
}
+ void updateMetrics(String name, int queueTime, int processingTime) {
+ rpcMetrics.addRpcQueueTime(queueTime);
+ rpcMetrics.addRpcProcessingTime(processingTime);
+ rpcDetailedMetrics.addProcessingTime(name, processingTime);
+ callQueue.addResponseTime(name, getPriorityLevel(), queueTime,
+ processingTime);
+
+ if (isLogSlowRPC()) {
+ logSlowRpcCalls(name, processingTime);
+ }
+ }
+
/**
* A convenience method to bind to a given address and report
* better exceptions if the address is not a valid host.
@@ -578,6 +599,10 @@ public abstract class Server {
return serviceAuthorizationManager;
}
+ private String getQueueClassPrefix() {
+ return CommonConfigurationKeys.IPC_NAMESPACE + "." + port;
+ }
+
static Class<? extends BlockingQueue<Call>> getQueueClass(
String prefix, Configuration conf) {
String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
@@ -585,8 +610,29 @@ public abstract class Server {
return CallQueueManager.convertQueueClass(queueClass, Call.class);
}
- private String getQueueClassPrefix() {
- return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port;
+ static Class<? extends RpcScheduler> getSchedulerClass(
+ String prefix, Configuration conf) {
+ String schedulerKeyname = prefix + "." + CommonConfigurationKeys
+ .IPC_SCHEDULER_IMPL_KEY;
+ Class<?> schedulerClass = conf.getClass(schedulerKeyname, null);
+ // Patch the configuration for legacy fcq configuration that does not have
+ // a separate scheduler setting
+ if (schedulerClass == null) {
+ String queueKeyName = prefix + "." + CommonConfigurationKeys
+ .IPC_CALLQUEUE_IMPL_KEY;
+ Class<?> queueClass = conf.getClass(queueKeyName, null);
+ if (queueClass != null) {
+ if (queueClass.getCanonicalName().equals(
+ FairCallQueue.class.getCanonicalName())) {
+ conf.setClass(schedulerKeyname, DecayRpcScheduler.class,
+ RpcScheduler.class);
+ }
+ }
+ }
+ schedulerClass = conf.getClass(schedulerKeyname,
+ DefaultRpcScheduler.class);
+
+ return CallQueueManager.convertSchedulerClass(schedulerClass);
}
/*
@@ -595,7 +641,8 @@ public abstract class Server {
public synchronized void refreshCallQueue(Configuration conf) {
// Create the next queue
String prefix = getQueueClassPrefix();
- callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
+ callQueue.swapQueue(getSchedulerClass(prefix, conf),
+ getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
}
/**
@@ -623,6 +670,8 @@ public abstract class Server {
private final byte[] clientId;
private final TraceScope traceScope; // the HTrace scope on the server side
private final CallerContext callerContext; // the call context
+ private int priorityLevel;
+ // the priority level assigned by scheduler, 0 by default
private Call(Call call) {
this(call.callId, call.retryCount, call.rpcRequest, call.connection,
@@ -709,7 +758,16 @@ public abstract class Server {
@Override
public UserGroupInformation getUserGroupInformation() {
return connection.user;
- }
+ }
+
+ @Override
+ public int getPriorityLevel() {
+ return this.priorityLevel;
+ }
+
+ public void setPriorityLevel(int priorityLevel) {
+ this.priorityLevel = priorityLevel;
+ }
}
/** Listens on the socket. Creates jobs for the handler threads*/
@@ -2151,6 +2209,9 @@ public abstract class Server {
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceScope, callerContext);
+ // Save the priority level assignment by the scheduler
+ call.setPriorityLevel(callQueue.getPriorityLevel(call));
+
if (callQueue.isClientBackoffEnabled()) {
// if RPC queue is full, we will ask the RPC client to back off by
// throwing RetriableException. Whether RPC client will honor
@@ -2166,9 +2227,10 @@ public abstract class Server {
private void queueRequestOrAskClientToBackOff(Call call)
throws WrappedRpcServerException, InterruptedException {
- // If rpc queue is full, we will ask the client to back off.
- boolean isCallQueued = callQueue.offer(call);
- if (!isCallQueued) {
+ // If rpc scheduler indicates back off based on performance
+ // degradation such as response time or rpc queue is full,
+ // we will ask the client to back off.
+ if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
rpcMetrics.incrClientBackoff();
RetriableException retriableException =
new RetriableException("Server is too busy.");
@@ -2513,6 +2575,7 @@ public abstract class Server {
// Setup appropriate callqueue
final String prefix = getQueueClassPrefix();
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
+ getSchedulerClass(prefix, conf),
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index a1db6be..a9dbb41 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
-import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -502,13 +501,12 @@ public class WritableRpcEngine implements RpcEngine {
}
}
}
-
- // Invoke the protocol method
- long startTime = Time.now();
- int qTime = (int) (startTime-receivedTime);
- Exception exception = null;
- try {
+ // Invoke the protocol method
+ long startTime = Time.now();
+ int qTime = (int) (startTime-receivedTime);
+ Exception exception = null;
+ try {
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
@@ -539,27 +537,20 @@ public class WritableRpcEngine implements RpcEngine {
exception = ioe;
throw ioe;
} finally {
- int processingTime = (int) (Time.now() - startTime);
- if (LOG.isDebugEnabled()) {
- String msg = "Served: " + call.getMethodName() +
- " queueTime= " + qTime +
- " procesingTime= " + processingTime;
- if (exception != null) {
- msg += " exception= " + exception.getClass().getSimpleName();
- }
- LOG.debug(msg);
- }
- String detailedMetricsName = (exception == null) ?
- call.getMethodName() :
- exception.getClass().getSimpleName();
- server.rpcMetrics.addRpcQueueTime(qTime);
- server.rpcMetrics.addRpcProcessingTime(processingTime);
- server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
- processingTime);
- if (server.isLogSlowRPC()) {
- server.logSlowRpcCalls(call.getMethodName(), processingTime);
+ int processingTime = (int) (Time.now() - startTime);
+ if (LOG.isDebugEnabled()) {
+ String msg = "Served: " + call.getMethodName() +
+ " queueTime= " + qTime + " procesingTime= " + processingTime;
+ if (exception != null) {
+ msg += " exception= " + exception.getClass().getSimpleName();
+ }
+ LOG.debug(msg);
}
- }
+ String detailedMetricsName = (exception == null) ?
+ call.getMethodName() :
+ exception.getClass().getSimpleName();
+ server.updateMetrics(detailedMetricsName, qTime, processingTime);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
index 4d659ac..af9ce1b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
@@ -27,17 +27,37 @@ import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
public class TestCallQueueManager {
private CallQueueManager<FakeCall> manager;
+ private Configuration conf = new Configuration();
- public class FakeCall {
+ public class FakeCall implements Schedulable {
public final int tag; // Can be used for unique identification
-
+ private int priorityLevel;
+ UserGroupInformation fakeUgi = UserGroupInformation.createRemoteUser
+ ("fakeUser");
public FakeCall(int tag) {
this.tag = tag;
}
+
+ @Override
+ public UserGroupInformation getUserGroupInformation() {
+ return fakeUgi;
+ }
+
+ @Override
+ public int getPriorityLevel() {
+ return priorityLevel;
+ }
+
+ public void setPriorityLevel(int level) {
+ this.priorityLevel = level;
+ }
}
/**
@@ -62,7 +82,9 @@ public class TestCallQueueManager {
try {
// Fill up to max (which is infinite if maxCalls < 0)
while (isRunning && (callsAdded < maxCalls || maxCalls < 0)) {
- cq.put(new FakeCall(this.tag));
+ FakeCall call = new FakeCall(this.tag);
+ call.setPriorityLevel(cq.getPriorityLevel(call));
+ cq.put(call);
callsAdded++;
}
} catch (InterruptedException e) {
@@ -135,7 +157,7 @@ public class TestCallQueueManager {
t.start();
t.join(100);
- assertEquals(putter.callsAdded, numberOfPuts);
+ assertEquals(numberOfPuts, putter.callsAdded);
t.interrupt();
}
@@ -143,23 +165,90 @@ public class TestCallQueueManager {
private static final Class<? extends BlockingQueue<FakeCall>> queueClass
= CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class);
+ private static final Class<? extends RpcScheduler> schedulerClass
+ = CallQueueManager.convertSchedulerClass(DefaultRpcScheduler.class);
+
@Test
public void testCallQueueCapacity() throws InterruptedException {
- manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
+ manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
+ 10, "", conf);
assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
}
@Test
public void testEmptyConsume() throws InterruptedException {
- manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
+ manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
+ 10, "", conf);
assertCanTake(manager, 0, 1); // Fails since it's empty
}
+ static Class<? extends BlockingQueue<FakeCall>> getQueueClass(
+ String prefix, Configuration conf) {
+ String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
+ Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);
+ return CallQueueManager.convertQueueClass(queueClass, FakeCall.class);
+ }
+
+ @Test
+ public void testFcqBackwardCompatibility() throws InterruptedException {
+ // Test BackwardCompatibility to ensure existing FCQ deployment still
+ // work without explicitly specifying DecayRpcScheduler
+ Configuration conf = new Configuration();
+ final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+
+ final String queueClassName = "org.apache.hadoop.ipc.FairCallQueue";
+ conf.setStrings(ns + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY,
+ queueClassName);
+
+ // Specify only Fair Call Queue without a scheduler
+ // Ensure the DecayScheduler will be added to avoid breaking.
+ Class<? extends RpcScheduler> scheduler = Server.getSchedulerClass(ns,
+ conf);
+ assertTrue(scheduler.getCanonicalName().
+ equals("org.apache.hadoop.ipc.DecayRpcScheduler"));
+
+ Class<? extends BlockingQueue<FakeCall>> queue =
+ (Class<? extends BlockingQueue<FakeCall>>) getQueueClass(ns, conf);
+ assertTrue(queue.getCanonicalName().equals(queueClassName));
+
+ manager = new CallQueueManager<FakeCall>(queue, scheduler, false,
+ 2, "", conf);
+
+ // Default FCQ has 4 levels and the max capacity is 2 x 4
+ assertCanPut(manager, 3, 3);
+ }
+
+ @Test
+ public void testSchedulerWithoutFCQ() throws InterruptedException {
+ Configuration conf = new Configuration();
+ // Test DecayedRpcScheduler without FCQ
+ // Ensure the default LinkedBlockingQueue can work with DecayedRpcScheduler
+ final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+ final String schedulerClassName = "org.apache.hadoop.ipc.DecayRpcScheduler";
+ conf.setStrings(ns + "." + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
+ schedulerClassName);
+
+ Class<? extends BlockingQueue<FakeCall>> queue =
+ (Class<? extends BlockingQueue<FakeCall>>) getQueueClass(ns, conf);
+ assertTrue(queue.getCanonicalName().equals("java.util.concurrent." +
+ "LinkedBlockingQueue"));
+
+ manager = new CallQueueManager<FakeCall>(queue,
+ Server.getSchedulerClass(ns, conf), false,
+ 3, "", conf);
+
+ // LinkedBlockingQueue with a capacity of 3 can put 3 calls
+ assertCanPut(manager, 3, 3);
+ // LinkedBlockingQueue with a capacity of 3 can't put 1 more call
+ assertCanPut(manager, 0, 1);
+ }
+
@Test(timeout=60000)
public void testSwapUnderContention() throws InterruptedException {
- manager = new CallQueueManager<FakeCall>(queueClass, false, 5000, "", null);
+ manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
+ 5000, "", conf);
ArrayList<Putter> producers = new ArrayList<Putter>();
ArrayList<Taker> consumers = new ArrayList<Taker>();
@@ -188,7 +277,7 @@ public class TestCallQueueManager {
Thread.sleep(500);
for (int i=0; i < 5; i++) {
- manager.swapQueue(queueClass, 5000, "", null);
+ manager.swapQueue(schedulerClass, queueClass, 5000, "", conf);
}
// Stop the producers
@@ -223,24 +312,50 @@ public class TestCallQueueManager {
}
public static class ExceptionFakeCall {
-
public ExceptionFakeCall() {
- throw new IllegalArgumentException("Exception caused by constructor.!!");
+ throw new IllegalArgumentException("Exception caused by call queue " +
+ "constructor.!!");
+ }
+ }
+
+ public static class ExceptionFakeScheduler {
+ public ExceptionFakeScheduler() {
+ throw new IllegalArgumentException("Exception caused by " +
+ "scheduler constructor.!!");
}
}
- private static final Class<? extends BlockingQueue<ExceptionFakeCall>> exceptionQueueClass = CallQueueManager
- .convertQueueClass(ExceptionFakeCall.class, ExceptionFakeCall.class);
+ private static final Class<? extends RpcScheduler>
+ exceptionSchedulerClass = CallQueueManager.convertSchedulerClass(
+ ExceptionFakeScheduler.class);
+
+ private static final Class<? extends BlockingQueue<ExceptionFakeCall>>
+ exceptionQueueClass = CallQueueManager.convertQueueClass(
+ ExceptionFakeCall.class, ExceptionFakeCall.class);
+
+ @Test
+ public void testCallQueueConstructorException() throws InterruptedException {
+ try {
+ new CallQueueManager<ExceptionFakeCall>(exceptionQueueClass,
+ schedulerClass, false, 10, "", new Configuration());
+ fail();
+ } catch (RuntimeException re) {
+ assertTrue(re.getCause() instanceof IllegalArgumentException);
+ assertEquals("Exception caused by call queue constructor.!!", re
+ .getCause()
+ .getMessage());
+ }
+ }
@Test
- public void testInvocationException() throws InterruptedException {
+ public void testSchedulerConstructorException() throws InterruptedException {
try {
- new CallQueueManager<ExceptionFakeCall>(exceptionQueueClass, false, 10,
- "", null);
+ new CallQueueManager<FakeCall>(queueClass, exceptionSchedulerClass,
+ false, 10, "", new Configuration());
fail();
} catch (RuntimeException re) {
assertTrue(re.getCause() instanceof IllegalArgumentException);
- assertEquals("Exception caused by constructor.!!", re.getCause()
+ assertEquals("Exception caused by scheduler constructor.!!", re.getCause()
.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
index edc3b00..0b0408c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
@@ -18,12 +18,11 @@
package org.apache.hadoop.ipc;
+import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.Arrays;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -31,8 +30,6 @@ import static org.mockito.Mockito.when;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-
public class TestDecayRpcScheduler {
private Schedulable mockCall(String id) {
Schedulable mockCall = mock(Schedulable.class);
@@ -57,30 +54,32 @@ public class TestDecayRpcScheduler {
}
@Test
+ @SuppressWarnings("deprecation")
public void testParsePeriod() {
// By default
scheduler = new DecayRpcScheduler(1, "", new Configuration());
- assertEquals(DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT,
+ assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
scheduler.getDecayPeriodMillis());
// Custom
Configuration conf = new Configuration();
- conf.setLong("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY,
+ conf.setLong("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
1058);
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(1058L, scheduler.getDecayPeriodMillis());
}
@Test
+ @SuppressWarnings("deprecation")
public void testParseFactor() {
// Default
scheduler = new DecayRpcScheduler(1, "", new Configuration());
- assertEquals(DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT,
+ assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT,
scheduler.getDecayFactor(), 0.00001);
// Custom
Configuration conf = new Configuration();
- conf.set("prefix." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY,
+ conf.set("prefix." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY,
"0.125");
scheduler = new DecayRpcScheduler(1, "prefix", conf);
assertEquals(0.125, scheduler.getDecayFactor(), 0.00001);
@@ -94,6 +93,7 @@ public class TestDecayRpcScheduler {
}
@Test
+ @SuppressWarnings("deprecation")
public void testParseThresholds() {
// Defaults vary by number of queues
Configuration conf = new Configuration();
@@ -111,16 +111,17 @@ public class TestDecayRpcScheduler {
// Custom
conf = new Configuration();
- conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY,
+ conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
"1, 10, 20, 50, 85");
scheduler = new DecayRpcScheduler(6, "ns", conf);
assertEqualDecimalArrays(new double[]{0.01, 0.1, 0.2, 0.5, 0.85}, scheduler.getThresholds());
}
@Test
+ @SuppressWarnings("deprecation")
public void testAccumulate() {
Configuration conf = new Configuration();
- conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
+ conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(0, scheduler.getCallCountSnapshot().size()); // empty first
@@ -138,10 +139,11 @@ public class TestDecayRpcScheduler {
}
@Test
- public void testDecay() {
+ @SuppressWarnings("deprecation")
+ public void testDecay() throws Exception {
Configuration conf = new Configuration();
- conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never
- conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY, "0.5");
+ conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never
+ conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(0, scheduler.getTotalCallSnapshot());
@@ -150,6 +152,8 @@ public class TestDecayRpcScheduler {
scheduler.getPriorityLevel(mockCall("A"));
}
+ sleep(1000);
+
for (int i = 0; i < 8; i++) {
scheduler.getPriorityLevel(mockCall("B"));
}
@@ -184,10 +188,11 @@ public class TestDecayRpcScheduler {
}
@Test
+ @SuppressWarnings("deprecation")
public void testPriority() {
Configuration conf = new Configuration();
- conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
- conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY,
+ conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
+ conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
"25, 50, 75");
scheduler = new DecayRpcScheduler(4, "ns", conf);
@@ -204,10 +209,11 @@ public class TestDecayRpcScheduler {
}
@Test(timeout=2000)
+ @SuppressWarnings("deprecation")
public void testPeriodic() throws InterruptedException {
Configuration conf = new Configuration();
- conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "10");
- conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY, "0.5");
+ conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10");
+ conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(10, scheduler.getDecayPeriodMillis());
@@ -219,7 +225,7 @@ public class TestDecayRpcScheduler {
// It should eventually decay to zero
while (scheduler.getTotalCallSnapshot() > 0) {
- Thread.sleep(10);
+ sleep(10);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
index 2694ba3..4a8ad3b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
@@ -37,21 +37,24 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration;
import org.mockito.Matchers;
-import static org.apache.hadoop.ipc.FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY;
-
public class TestFairCallQueue extends TestCase {
private FairCallQueue<Schedulable> fcq;
- private Schedulable mockCall(String id) {
+ private Schedulable mockCall(String id, int priority) {
Schedulable mockCall = mock(Schedulable.class);
UserGroupInformation ugi = mock(UserGroupInformation.class);
when(ugi.getUserName()).thenReturn(id);
when(mockCall.getUserGroupInformation()).thenReturn(ugi);
+ when(mockCall.getPriorityLevel()).thenReturn(priority);
return mockCall;
}
+ private Schedulable mockCall(String id) {
+ return mockCall(id, 0);
+ }
+
// A scheduler which always schedules into priority zero
private RpcScheduler alwaysZeroScheduler;
{
@@ -60,11 +63,12 @@ public class TestFairCallQueue extends TestCase {
alwaysZeroScheduler = sched;
}
+ @SuppressWarnings("deprecation")
public void setUp() {
Configuration conf = new Configuration();
- conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
+ conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
- fcq = new FairCallQueue<Schedulable>(5, "ns", conf);
+ fcq = new FairCallQueue<Schedulable>(2, 5, "ns", conf);
}
//
@@ -85,7 +89,6 @@ public class TestFairCallQueue extends TestCase {
}
public void testOfferSucceeds() {
- fcq.setScheduler(alwaysZeroScheduler);
for (int i = 0; i < 5; i++) {
// We can fit 10 calls
@@ -96,7 +99,6 @@ public class TestFairCallQueue extends TestCase {
}
public void testOfferFailsWhenFull() {
- fcq.setScheduler(alwaysZeroScheduler);
for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"))); }
assertFalse(fcq.offer(mockCall("c"))); // It's full
@@ -107,11 +109,10 @@ public class TestFairCallQueue extends TestCase {
public void testOfferSucceedsWhenScheduledLowPriority() {
// Scheduler will schedule into queue 0 x 5, then queue 1
RpcScheduler sched = mock(RpcScheduler.class);
- when(sched.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(0, 0, 0, 0, 0, 1, 0);
- fcq.setScheduler(sched);
- for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"))); }
+ int mockedPriorities[] = {0, 0, 0, 0, 0, 1, 0};
+ for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c", mockedPriorities[i]))); }
- assertTrue(fcq.offer(mockCall("c")));
+ assertTrue(fcq.offer(mockCall("c", mockedPriorities[5])));
assertEquals(6, fcq.size());
}
@@ -121,7 +122,7 @@ public class TestFairCallQueue extends TestCase {
}
public void testPeekNonDestructive() {
- Schedulable call = mockCall("c");
+ Schedulable call = mockCall("c", 0);
assertTrue(fcq.offer(call));
assertEquals(call, fcq.peek());
@@ -130,8 +131,8 @@ public class TestFairCallQueue extends TestCase {
}
public void testPeekPointsAtHead() {
- Schedulable call = mockCall("c");
- Schedulable next = mockCall("b");
+ Schedulable call = mockCall("c", 0);
+ Schedulable next = mockCall("b", 0);
fcq.offer(call);
fcq.offer(next);
@@ -139,15 +140,11 @@ public class TestFairCallQueue extends TestCase {
}
public void testPollTimeout() throws InterruptedException {
- fcq.setScheduler(alwaysZeroScheduler);
-
assertNull(fcq.poll(10, TimeUnit.MILLISECONDS));
}
public void testPollSuccess() throws InterruptedException {
- fcq.setScheduler(alwaysZeroScheduler);
-
- Schedulable call = mockCall("c");
+ Schedulable call = mockCall("c", 0);
assertTrue(fcq.offer(call));
assertEquals(call, fcq.poll(10, TimeUnit.MILLISECONDS));
@@ -156,7 +153,6 @@ public class TestFairCallQueue extends TestCase {
}
public void testOfferTimeout() throws InterruptedException {
- fcq.setScheduler(alwaysZeroScheduler);
for (int i = 0; i < 5; i++) {
assertTrue(fcq.offer(mockCall("c"), 10, TimeUnit.MILLISECONDS));
}
@@ -166,13 +162,11 @@ public class TestFairCallQueue extends TestCase {
assertEquals(5, fcq.size());
}
+ @SuppressWarnings("deprecation")
public void testDrainTo() {
Configuration conf = new Configuration();
- conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
- FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(10, "ns", conf);
-
- fcq.setScheduler(alwaysZeroScheduler);
- fcq2.setScheduler(alwaysZeroScheduler);
+ conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
+ FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(2, 10, "ns", conf);
// Start with 3 in fcq, to be drained
for (int i = 0; i < 3; i++) {
@@ -185,13 +179,11 @@ public class TestFairCallQueue extends TestCase {
assertEquals(3, fcq2.size());
}
+ @SuppressWarnings("deprecation")
public void testDrainToWithLimit() {
Configuration conf = new Configuration();
- conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
- FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(10, "ns", conf);
-
- fcq.setScheduler(alwaysZeroScheduler);
- fcq2.setScheduler(alwaysZeroScheduler);
+ conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
+ FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(2, 10, "ns", conf);
// Start with 3 in fcq, to be drained
for (int i = 0; i < 3; i++) {
@@ -209,27 +201,23 @@ public class TestFairCallQueue extends TestCase {
}
public void testFirstQueueFullRemainingCapacity() {
- fcq.setScheduler(alwaysZeroScheduler);
while (fcq.offer(mockCall("c"))) ; // Queue 0 will fill up first, then queue 1
assertEquals(5, fcq.remainingCapacity());
}
public void testAllQueuesFullRemainingCapacity() {
- RpcScheduler sched = mock(RpcScheduler.class);
- when(sched.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(0, 0, 0, 0, 0, 1, 1, 1, 1, 1);
- fcq.setScheduler(sched);
- while (fcq.offer(mockCall("c"))) ;
+ int[] mockedPriorities = {0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0};
+ int i = 0;
+ while (fcq.offer(mockCall("c", mockedPriorities[i++]))) ;
assertEquals(0, fcq.remainingCapacity());
assertEquals(10, fcq.size());
}
public void testQueuesPartialFilledRemainingCapacity() {
- RpcScheduler sched = mock(RpcScheduler.class);
- when(sched.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(0, 1, 0, 1, 0);
- fcq.setScheduler(sched);
- for (int i = 0; i < 5; i++) { fcq.offer(mockCall("c")); }
+ int[] mockedPriorities = {0, 1, 0, 1, 0};
+ for (int i = 0; i < 5; i++) { fcq.offer(mockCall("c", mockedPriorities[i])); }
assertEquals(5, fcq.remainingCapacity());
assertEquals(5, fcq.size());
@@ -351,16 +339,12 @@ public class TestFairCallQueue extends TestCase {
// Make sure put will overflow into lower queues when the top is full
public void testPutOverflows() throws InterruptedException {
- fcq.setScheduler(alwaysZeroScheduler);
-
// We can fit more than 5, even though the scheduler suggests the top queue
assertCanPut(fcq, 8, 8);
assertEquals(8, fcq.size());
}
public void testPutBlocksWhenAllFull() throws InterruptedException {
- fcq.setScheduler(alwaysZeroScheduler);
-
assertCanPut(fcq, 10, 10); // Fill up
assertEquals(10, fcq.size());
@@ -369,12 +353,10 @@ public class TestFairCallQueue extends TestCase {
}
public void testTakeBlocksWhenEmpty() throws InterruptedException {
- fcq.setScheduler(alwaysZeroScheduler);
assertCanTake(fcq, 0, 1);
}
public void testTakeRemovesCall() throws InterruptedException {
- fcq.setScheduler(alwaysZeroScheduler);
Schedulable call = mockCall("c");
fcq.offer(call);
@@ -383,17 +365,14 @@ public class TestFairCallQueue extends TestCase {
}
public void testTakeTriesNextQueue() throws InterruptedException {
- // Make a FCQ filled with calls in q 1 but empty in q 0
- RpcScheduler q1Scheduler = mock(RpcScheduler.class);
- when(q1Scheduler.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(1);
- fcq.setScheduler(q1Scheduler);
// A mux which only draws from q 0
RpcMultiplexer q0mux = mock(RpcMultiplexer.class);
when(q0mux.getAndAdvanceCurrentIndex()).thenReturn(0);
fcq.setMultiplexer(q0mux);
- Schedulable call = mockCall("c");
+ // Make a FCQ filled with calls in q 1 but empty in q 0
+ Schedulable call = mockCall("c", 1);
fcq.put(call);
// Take from q1 even though mux said q0, since q0 empty
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java
index 1fa0fff..2638412 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java
@@ -19,25 +19,15 @@
package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import org.junit.Assert;
-import org.junit.Assume;
import org.junit.Test;
-import org.junit.Before;
-import org.junit.After;
import java.util.List;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.conf.Configuration;
@@ -55,16 +45,20 @@ public class TestIdentityProviders {
}
}
+ @Override
+ public int getPriorityLevel() {
+ return 0;
+ }
}
@Test
public void testPluggableIdentityProvider() {
Configuration conf = new Configuration();
- conf.set(CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
+ conf.set(CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
"org.apache.hadoop.ipc.UserIdentityProvider");
List<IdentityProvider> providers = conf.getInstances(
- CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
+ CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
IdentityProvider.class);
assertTrue(providers.size() == 1);