You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2019/08/14 20:45:32 UTC
[hadoop] branch branch-3.1 updated: HDFS-14403. Cost-based
extension to the RPC Fair Call Queue. Contributed by Christopher Gregorian.
This is an automated email from the ASF dual-hosted git repository.
xkrogen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 96ea7f0 HDFS-14403. Cost-based extension to the RPC Fair Call Queue. Contributed by Christopher Gregorian.
96ea7f0 is described below
commit 96ea7f056238183cbd73d07bb488fd6440ff233a
Author: Christopher Gregorian <cs...@gmail.com>
AuthorDate: Fri May 24 17:09:52 2019 -0700
HDFS-14403. Cost-based extension to the RPC Fair Call Queue. Contributed by Christopher Gregorian.
(cherry picked from 129576f628d370def74e56112aba3a93e97bbf70)
(cherry picked from e4b650f91e7353f98fee2a725490ca45aa81e1a4)
---
.../apache/hadoop/fs/CommonConfigurationKeys.java | 1 +
.../org/apache/hadoop/ipc/CallQueueManager.java | 1 -
.../java/org/apache/hadoop/ipc/CostProvider.java | 46 ++++
.../org/apache/hadoop/ipc/DecayRpcScheduler.java | 235 ++++++++++++---------
.../org/apache/hadoop/ipc/DefaultCostProvider.java | 43 ++++
.../hadoop/ipc/WeightedTimeCostProvider.java | 110 ++++++++++
.../apache/hadoop/ipc/TestDecayRpcScheduler.java | 174 ++++++++++++---
.../test/java/org/apache/hadoop/ipc/TestRPC.java | 102 +++++----
.../hadoop/ipc/TestWeightedTimeCostProvider.java | 86 ++++++++
9 files changed, 613 insertions(+), 185 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 1eb27f8..7574949 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -104,6 +104,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
public static final String IPC_SCHEDULER_IMPL_KEY = "scheduler.impl";
public static final String IPC_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
+ public static final String IPC_COST_PROVIDER_KEY = "cost-provider.impl";
public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 765ce18..b1c9f6f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -197,7 +197,6 @@ public class CallQueueManager<E extends Schedulable>
}
// This should be only called once per call and cached in the call object
- // each getPriorityLevel call will increment the counter for the caller
int getPriorityLevel(Schedulable e) {
return scheduler.getPriorityLevel(e);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java
new file mode 100644
index 0000000..cf76e7d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used by {@link DecayRpcScheduler} to get the cost of users' operations. This
+ * is configurable using
+ * {@link org.apache.hadoop.fs.CommonConfigurationKeys#IPC_COST_PROVIDER_KEY}.
+ */
+public interface CostProvider {
+
+ /**
+ * Initialize this provider using the given configuration, examining only
+ * ones which fall within the provided namespace.
+ *
+ * @param namespace The namespace to use when looking up configurations.
+ * @param conf The configuration
+ */
+ void init(String namespace, Configuration conf);
+
+ /**
+ * Get cost from {@link ProcessingDetails} which will be used in scheduler.
+ *
+ * @param details Process details
+ * @return The cost of the call
+ */
+ long getCost(ProcessingDetails details);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index 3c1569a..e2dd4f7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -58,8 +58,8 @@ import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
/**
- * The decay RPC scheduler counts incoming requests in a map, then
- * decays the counts at a fixed time interval. The scheduler is optimized
+ * The decay RPC scheduler tracks the cost of incoming requests in a map, then
+ * decays the costs at a fixed time interval. The scheduler is optimized
* for large periods (on the order of seconds), as it offloads work to the
* decay sweep.
*/
@@ -134,15 +134,15 @@ public class DecayRpcScheduler implements RpcScheduler,
private static final ObjectWriter WRITER = new ObjectMapper().writer();
// Track the decayed and raw (no decay) number of calls for each schedulable
- // identity from all previous decay windows: idx 0 for decayed call count and
- // idx 1 for the raw call count
- private final ConcurrentHashMap<Object, List<AtomicLong>> callCounts =
+ // identity from all previous decay windows: idx 0 for decayed call cost and
+ // idx 1 for the raw call cost
+ private final ConcurrentHashMap<Object, List<AtomicLong>> callCosts =
new ConcurrentHashMap<Object, List<AtomicLong>>();
- // Should be the sum of all AtomicLongs in decayed callCounts
- private final AtomicLong totalDecayedCallCount = new AtomicLong();
- // The sum of all AtomicLongs in raw callCounts
- private final AtomicLong totalRawCallCount = new AtomicLong();
+ // Should be the sum of all AtomicLongs in decayed callCosts
+ private final AtomicLong totalDecayedCallCost = new AtomicLong();
+ // The sum of all AtomicLongs in raw callCosts
+ private final AtomicLong totalRawCallCost = new AtomicLong();
// Track total call count and response time in current decay window
@@ -160,7 +160,7 @@ public class DecayRpcScheduler implements RpcScheduler,
// Tune the behavior of the scheduler
private final long decayPeriodMillis; // How long between each tick
- private final double decayFactor; // nextCount = currentCount * decayFactor
+ private final double decayFactor; // nextCost = currentCost * decayFactor
private final int numLevels;
private final double[] thresholds;
private final IdentityProvider identityProvider;
@@ -170,9 +170,10 @@ public class DecayRpcScheduler implements RpcScheduler,
private final int topUsersCount; // e.g., report top 10 users' metrics
private static final double PRECISION = 0.0001;
private MetricsProxy metricsProxy;
+ private final CostProvider costProvider;
/**
- * This TimerTask will call decayCurrentCounts until
+ * This TimerTask will call decayCurrentCosts until
* the scheduler has been garbage collected.
*/
public static class DecayTask extends TimerTask {
@@ -188,7 +189,7 @@ public class DecayRpcScheduler implements RpcScheduler,
public void run() {
DecayRpcScheduler sched = schedulerRef.get();
if (sched != null) {
- sched.decayCurrentCounts();
+ sched.decayCurrentCosts();
} else {
// Our scheduler was garbage collected since it is no longer in use,
// so we should terminate the timer as well
@@ -215,6 +216,7 @@ public class DecayRpcScheduler implements RpcScheduler,
this.decayFactor = parseDecayFactor(ns, conf);
this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
this.identityProvider = this.parseIdentityProvider(ns, conf);
+ this.costProvider = this.parseCostProvider(ns, conf);
this.thresholds = parseThresholds(ns, conf, numLevels);
this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns,
conf);
@@ -242,6 +244,24 @@ public class DecayRpcScheduler implements RpcScheduler,
recomputeScheduleCache();
}
+ private CostProvider parseCostProvider(String ns, Configuration conf) {
+ List<CostProvider> providers = conf.getInstances(
+ ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
+ CostProvider.class);
+
+ if (providers.size() < 1) {
+ LOG.info("CostProvider not specified, defaulting to DefaultCostProvider");
+ return new DefaultCostProvider();
+ } else if (providers.size() > 1) {
+ LOG.warn("Found multiple CostProviders; using: {}",
+ providers.get(0).getClass());
+ }
+
+ CostProvider provider = providers.get(0); // use the first
+ provider.init(ns, conf);
+ return provider;
+ }
+
// Load configs
private IdentityProvider parseIdentityProvider(String ns,
Configuration conf) {
@@ -388,41 +408,41 @@ public class DecayRpcScheduler implements RpcScheduler,
}
/**
- * Decay the stored counts for each user and clean as necessary.
+ * Decay the stored costs for each user and clean as necessary.
* This method should be called periodically in order to keep
- * counts current.
+ * costs current.
*/
- private void decayCurrentCounts() {
+ private void decayCurrentCosts() {
try {
- long totalDecayedCount = 0;
- long totalRawCount = 0;
+ long totalDecayedCost = 0;
+ long totalRawCost = 0;
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
- callCounts.entrySet().iterator();
+ callCosts.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Object, List<AtomicLong>> entry = it.next();
- AtomicLong decayedCount = entry.getValue().get(0);
- AtomicLong rawCount = entry.getValue().get(1);
+ AtomicLong decayedCost = entry.getValue().get(0);
+ AtomicLong rawCost = entry.getValue().get(1);
// Compute the next value by reducing it by the decayFactor
- totalRawCount += rawCount.get();
- long currentValue = decayedCount.get();
+ totalRawCost += rawCost.get();
+ long currentValue = decayedCost.get();
long nextValue = (long) (currentValue * decayFactor);
- totalDecayedCount += nextValue;
- decayedCount.set(nextValue);
+ totalDecayedCost += nextValue;
+ decayedCost.set(nextValue);
if (nextValue == 0) {
// We will clean up unused keys here. An interesting optimization
- // might be to have an upper bound on keyspace in callCounts and only
+ // might be to have an upper bound on keyspace in callCosts and only
// clean once we pass it.
it.remove();
}
}
// Update the total so that we remain in sync
- totalDecayedCallCount.set(totalDecayedCount);
- totalRawCallCount.set(totalRawCount);
+ totalDecayedCallCost.set(totalDecayedCost);
+ totalRawCallCost.set(totalRawCost);
// Now refresh the cache of scheduling decisions
recomputeScheduleCache();
@@ -430,19 +450,19 @@ public class DecayRpcScheduler implements RpcScheduler,
// Update average response time with decay
updateAverageResponseTime(true);
} catch (Exception ex) {
- LOG.error("decayCurrentCounts exception: " +
- ExceptionUtils.getFullStackTrace(ex));
+ LOG.error("decayCurrentCosts exception: " +
+ ExceptionUtils.getStackTrace(ex));
throw ex;
}
}
/**
- * Update the scheduleCache to match current conditions in callCounts.
+ * Update the scheduleCache to match current conditions in callCosts.
*/
private void recomputeScheduleCache() {
Map<Object, Integer> nextCache = new HashMap<Object, Integer>();
- for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
+ for (Map.Entry<Object, List<AtomicLong>> entry : callCosts.entrySet()) {
Object id = entry.getKey();
AtomicLong value = entry.getValue().get(0);
@@ -457,51 +477,52 @@ public class DecayRpcScheduler implements RpcScheduler,
}
/**
- * Get the number of occurrences and increment atomically.
- * @param identity the identity of the user to increment
- * @return the value before incrementation
+ * Adjust the stored cost for a given identity.
+ *
+ * @param identity the identity of the user whose cost should be adjusted
+ * @param costDelta the cost to add for the given identity
*/
- private long getAndIncrementCallCounts(Object identity)
- throws InterruptedException {
- // We will increment the count, or create it if no such count exists
- List<AtomicLong> count = this.callCounts.get(identity);
- if (count == null) {
- // Create the counts since no such count exists.
- // idx 0 for decayed call count
- // idx 1 for the raw call count
- count = new ArrayList<AtomicLong>(2);
- count.add(new AtomicLong(0));
- count.add(new AtomicLong(0));
+ private void addCost(Object identity, long costDelta) {
+ // We will increment the cost, or create it if no such cost exists
+ List<AtomicLong> cost = this.callCosts.get(identity);
+ if (cost == null) {
+ // Create the costs since no such cost exists.
+ // idx 0 for decayed call cost
+ // idx 1 for the raw call cost
+ cost = new ArrayList<AtomicLong>(2);
+ cost.add(new AtomicLong(0));
+ cost.add(new AtomicLong(0));
// Put it in, or get the AtomicInteger that was put in by another thread
- List<AtomicLong> otherCount = callCounts.putIfAbsent(identity, count);
- if (otherCount != null) {
- count = otherCount;
+ List<AtomicLong> otherCost = callCosts.putIfAbsent(identity, cost);
+ if (otherCost != null) {
+ cost = otherCost;
}
}
// Update the total
- totalDecayedCallCount.getAndIncrement();
- totalRawCallCount.getAndIncrement();
+ totalDecayedCallCost.getAndAdd(costDelta);
+ totalRawCallCost.getAndAdd(costDelta);
// At this point value is guaranteed to be not null. It may however have
- // been clobbered from callCounts. Nonetheless, we return what
+ // been clobbered from callCosts. Nonetheless, we return what
// we have.
- count.get(1).getAndIncrement();
- return count.get(0).getAndIncrement();
+ cost.get(1).getAndAdd(costDelta);
+ cost.get(0).getAndAdd(costDelta);
}
/**
- * Given the number of occurrences, compute a scheduling decision.
- * @param occurrences how many occurrences
+ * Given the cost for an identity, compute a scheduling decision.
+ *
+ * @param cost the cost for an identity
* @return scheduling decision from 0 to numLevels - 1
*/
- private int computePriorityLevel(long occurrences) {
- long totalCallSnapshot = totalDecayedCallCount.get();
+ private int computePriorityLevel(long cost) {
+ long totalCallSnapshot = totalDecayedCallCost.get();
double proportion = 0;
if (totalCallSnapshot > 0) {
- proportion = (double) occurrences / totalCallSnapshot;
+ proportion = (double) cost / totalCallSnapshot;
}
// Start with low priority levels, since they will be most common
@@ -522,31 +543,23 @@ public class DecayRpcScheduler implements RpcScheduler,
* @return integer scheduling decision from 0 to numLevels - 1
*/
private int cachedOrComputedPriorityLevel(Object identity) {
- try {
- long occurrences = this.getAndIncrementCallCounts(identity);
-
- // Try the cache
- Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
- if (scheduleCache != null) {
- Integer priority = scheduleCache.get(identity);
- if (priority != null) {
- LOG.debug("Cache priority for: {} with priority: {}", identity,
- priority);
- return priority;
- }
+ // Try the cache
+ Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
+ if (scheduleCache != null) {
+ Integer priority = scheduleCache.get(identity);
+ if (priority != null) {
+ LOG.debug("Cache priority for: {} with priority: {}", identity,
+ priority);
+ return priority;
}
-
- // Cache was no good, compute it
- int priority = computePriorityLevel(occurrences);
- LOG.debug("compute priority for " + identity + " priority " + priority);
- return priority;
-
- } catch (InterruptedException ie) {
- LOG.warn("Caught InterruptedException, returning low priority level");
- LOG.debug("Fallback priority for: {} with priority: {}", identity,
- numLevels - 1);
- return numLevels - 1;
}
+
+ // Cache was no good, compute it
+ List<AtomicLong> costList = callCosts.get(identity);
+ long currentCost = costList == null ? 0 : costList.get(0).get();
+ int priority = computePriorityLevel(currentCost);
+ LOG.debug("compute priority for {} priority {}", identity, priority);
+ return priority;
}
/**
@@ -596,6 +609,10 @@ public class DecayRpcScheduler implements RpcScheduler,
@Override
public void addResponseTime(String callName, Schedulable schedulable,
ProcessingDetails details) {
+ String user = identityProvider.makeIdentity(schedulable);
+ long processingCost = costProvider.getCost(details);
+ addCost(user, processingCost);
+
int priorityLevel = schedulable.getPriorityLevel();
long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
@@ -643,22 +660,30 @@ public class DecayRpcScheduler implements RpcScheduler,
// For testing
@VisibleForTesting
- public double getDecayFactor() { return decayFactor; }
+ double getDecayFactor() {
+ return decayFactor;
+ }
@VisibleForTesting
- public long getDecayPeriodMillis() { return decayPeriodMillis; }
+ long getDecayPeriodMillis() {
+ return decayPeriodMillis;
+ }
@VisibleForTesting
- public double[] getThresholds() { return thresholds; }
+ double[] getThresholds() {
+ return thresholds;
+ }
@VisibleForTesting
- public void forceDecay() { decayCurrentCounts(); }
+ void forceDecay() {
+ decayCurrentCosts();
+ }
@VisibleForTesting
- public Map<Object, Long> getCallCountSnapshot() {
+ Map<Object, Long> getCallCostSnapshot() {
HashMap<Object, Long> snapshot = new HashMap<Object, Long>();
- for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
+ for (Map.Entry<Object, List<AtomicLong>> entry : callCosts.entrySet()) {
snapshot.put(entry.getKey(), entry.getValue().get(0).get());
}
@@ -666,8 +691,8 @@ public class DecayRpcScheduler implements RpcScheduler,
}
@VisibleForTesting
- public long getTotalCallSnapshot() {
- return totalDecayedCallCount.get();
+ long getTotalCallSnapshot() {
+ return totalDecayedCallCost.get();
}
/**
@@ -800,15 +825,15 @@ public class DecayRpcScheduler implements RpcScheduler,
}
public int getUniqueIdentityCount() {
- return callCounts.size();
+ return callCosts.size();
}
public long getTotalCallVolume() {
- return totalDecayedCallCount.get();
+ return totalDecayedCallCost.get();
}
public long getTotalRawCallVolume() {
- return totalRawCallCount.get();
+ return totalRawCallCost.get();
}
public long[] getResponseTimeCountInLastWindow() {
@@ -901,17 +926,17 @@ public class DecayRpcScheduler implements RpcScheduler,
}
}
- // Get the top N callers' raw call count and scheduler decision
+ // Get the top N callers' raw call cost and scheduler decision
private TopN getTopCallers(int n) {
TopN topNCallers = new TopN(n);
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
- callCounts.entrySet().iterator();
+ callCosts.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Object, List<AtomicLong>> entry = it.next();
String caller = entry.getKey().toString();
- Long count = entry.getValue().get(1).get();
- if (count > 0) {
- topNCallers.offer(new NameValuePair(caller, count));
+ Long cost = entry.getValue().get(1).get();
+ if (cost > 0) {
+ topNCallers.offer(new NameValuePair(caller, cost));
}
}
return topNCallers;
@@ -932,25 +957,25 @@ public class DecayRpcScheduler implements RpcScheduler,
public String getCallVolumeSummary() {
try {
- return WRITER.writeValueAsString(getDecayedCallCounts());
+ return WRITER.writeValueAsString(getDecayedCallCosts());
} catch (Exception e) {
return "Error: " + e.getMessage();
}
}
- private Map<Object, Long> getDecayedCallCounts() {
- Map<Object, Long> decayedCallCounts = new HashMap<>(callCounts.size());
+ private Map<Object, Long> getDecayedCallCosts() {
+ Map<Object, Long> decayedCallCosts = new HashMap<>(callCosts.size());
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
- callCounts.entrySet().iterator();
+ callCosts.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Object, List<AtomicLong>> entry = it.next();
Object user = entry.getKey();
- Long decayedCount = entry.getValue().get(0).get();
- if (decayedCount > 0) {
- decayedCallCounts.put(user, decayedCount);
+ Long decayedCost = entry.getValue().get(0).get();
+ if (decayedCost > 0) {
+ decayedCallCosts.put(user, decayedCost);
}
}
- return decayedCallCounts;
+ return decayedCallCosts;
}
@Override
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java
new file mode 100644
index 0000000..ad56ddf
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Ignores process details and returns a constant value for each call.
+ */
+public class DefaultCostProvider implements CostProvider {
+
+ @Override
+ public void init(String namespace, Configuration conf) {
+ // No-op
+ }
+
+ /**
+ * Returns 1, regardless of the processing details.
+ *
+ * @param details Process details (ignored)
+ * @return 1
+ */
+ @Override
+ public long getCost(ProcessingDetails details) {
+ return 1;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java
new file mode 100644
index 0000000..4304b24
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.util.Locale;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
+
+/**
+ * A {@link CostProvider} that calculates the cost for an operation
+ * as a weighted sum of its processing time values (see
+ * {@link ProcessingDetails}). This can be used by specifying the
+ * {@link org.apache.hadoop.fs.CommonConfigurationKeys#IPC_COST_PROVIDER_KEY}
+ * configuration key.
+ *
+ * <p/>This allows for configuration of how heavily each of the operations
+ * within {@link ProcessingDetails} is weighted. By default,
+ * {@link ProcessingDetails.Timing#LOCKFREE},
+ * {@link ProcessingDetails.Timing#RESPONSE}, and
+ * {@link ProcessingDetails.Timing#HANDLER} times have a weight of
+ * {@value #DEFAULT_LOCKFREE_WEIGHT},
+ * {@link ProcessingDetails.Timing#LOCKSHARED} has a weight of
+ * {@value #DEFAULT_LOCKSHARED_WEIGHT},
+ * {@link ProcessingDetails.Timing#LOCKEXCLUSIVE} has a weight of
+ * {@value #DEFAULT_LOCKEXCLUSIVE_WEIGHT}, and others are ignored.
+ * These values can all be configured using the {@link #WEIGHT_CONFIG_PREFIX}
+ * key, prefixed with the IPC namespace, and suffixed with the name of the
+ * timing measurement from {@link ProcessingDetails} (all lowercase).
+ * For example, to set the lock exclusive weight to be 1000, set:
+ * <pre>
+ * ipc.8020.cost-provider.impl=org.apache.hadoop.ipc.WeightedTimeCostProvider
+ * ipc.8020.weighted-cost.lockexclusive=1000
+ * </pre>
+ */
+public class WeightedTimeCostProvider implements CostProvider {
+
+ /**
+ * The prefix used in configuration values specifying the weight to use when
+ * determining the cost of an operation. See the class Javadoc for more info.
+ */
+ public static final String WEIGHT_CONFIG_PREFIX = ".weighted-cost.";
+ static final int DEFAULT_LOCKFREE_WEIGHT = 1;
+ static final int DEFAULT_LOCKSHARED_WEIGHT = 10;
+ static final int DEFAULT_LOCKEXCLUSIVE_WEIGHT = 100;
+
+ private long[] weights;
+
+ @Override
+ public void init(String namespace, Configuration conf) {
+ weights = new long[Timing.values().length];
+ for (Timing timing : ProcessingDetails.Timing.values()) {
+ final int defaultValue;
+ switch (timing) {
+ case LOCKFREE:
+ case RESPONSE:
+ case HANDLER:
+ defaultValue = DEFAULT_LOCKFREE_WEIGHT;
+ break;
+ case LOCKSHARED:
+ defaultValue = DEFAULT_LOCKSHARED_WEIGHT;
+ break;
+ case LOCKEXCLUSIVE:
+ defaultValue = DEFAULT_LOCKEXCLUSIVE_WEIGHT;
+ break;
+ default:
+ // by default don't bill for queueing or lock wait time
+ defaultValue = 0;
+ }
+ String key = namespace + WEIGHT_CONFIG_PREFIX
+ + timing.name().toLowerCase(Locale.ENGLISH);
+ weights[timing.ordinal()] = conf.getInt(key, defaultValue);
+ }
+ }
+
+ /**
+ * Calculates a weighted sum of the times stored on the provided processing
+ * details to be used as the cost in {@link DecayRpcScheduler}.
+ *
+ * @param details Processing details
+ * @return The weighted sum of the times. The returned unit is the same
+ * as the default unit used by the provided processing details.
+ */
+ @Override
+ public long getCost(ProcessingDetails details) {
+ assert weights != null : "Cost provider must be initialized before use";
+ long cost = 0;
+ // weights was initialized to the same length as Timing.values()
+ for (int i = 0; i < Timing.values().length; i++) {
+ cost += details.get(Timing.values()[i]) * weights[i];
+ }
+ return cost;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
index 10ab40a..7bdc6b5 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@ import javax.management.ObjectName;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
public class TestDecayRpcScheduler {
private Schedulable mockCall(String id) {
@@ -131,67 +133,69 @@ public class TestDecayRpcScheduler {
conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
scheduler = new DecayRpcScheduler(1, "ns", conf);
- assertEquals(0, scheduler.getCallCountSnapshot().size()); // empty first
+ assertEquals(0, scheduler.getCallCostSnapshot().size()); // empty first
- scheduler.getPriorityLevel(mockCall("A"));
- assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
- assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
+ getPriorityIncrementCallCount("A");
+ assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
+ assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
- scheduler.getPriorityLevel(mockCall("A"));
- scheduler.getPriorityLevel(mockCall("B"));
- scheduler.getPriorityLevel(mockCall("A"));
+ getPriorityIncrementCallCount("A");
+ getPriorityIncrementCallCount("B");
+ getPriorityIncrementCallCount("A");
- assertEquals(3, scheduler.getCallCountSnapshot().get("A").longValue());
- assertEquals(1, scheduler.getCallCountSnapshot().get("B").longValue());
+ assertEquals(3, scheduler.getCallCostSnapshot().get("A").longValue());
+ assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue());
}
@Test
@SuppressWarnings("deprecation")
public void testDecay() throws Exception {
Configuration conf = new Configuration();
- conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never
- conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
+ conf.setLong("ns." // Never decay
+ + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999999);
+ conf.setDouble("ns."
+ + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY, 0.5);
scheduler = new DecayRpcScheduler(1, "ns", conf);
assertEquals(0, scheduler.getTotalCallSnapshot());
for (int i = 0; i < 4; i++) {
- scheduler.getPriorityLevel(mockCall("A"));
+ getPriorityIncrementCallCount("A");
}
sleep(1000);
for (int i = 0; i < 8; i++) {
- scheduler.getPriorityLevel(mockCall("B"));
+ getPriorityIncrementCallCount("B");
}
assertEquals(12, scheduler.getTotalCallSnapshot());
- assertEquals(4, scheduler.getCallCountSnapshot().get("A").longValue());
- assertEquals(8, scheduler.getCallCountSnapshot().get("B").longValue());
+ assertEquals(4, scheduler.getCallCostSnapshot().get("A").longValue());
+ assertEquals(8, scheduler.getCallCostSnapshot().get("B").longValue());
scheduler.forceDecay();
assertEquals(6, scheduler.getTotalCallSnapshot());
- assertEquals(2, scheduler.getCallCountSnapshot().get("A").longValue());
- assertEquals(4, scheduler.getCallCountSnapshot().get("B").longValue());
+ assertEquals(2, scheduler.getCallCostSnapshot().get("A").longValue());
+ assertEquals(4, scheduler.getCallCostSnapshot().get("B").longValue());
scheduler.forceDecay();
assertEquals(3, scheduler.getTotalCallSnapshot());
- assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
- assertEquals(2, scheduler.getCallCountSnapshot().get("B").longValue());
+ assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
+ assertEquals(2, scheduler.getCallCostSnapshot().get("B").longValue());
scheduler.forceDecay();
assertEquals(1, scheduler.getTotalCallSnapshot());
- assertEquals(null, scheduler.getCallCountSnapshot().get("A"));
- assertEquals(1, scheduler.getCallCountSnapshot().get("B").longValue());
+ assertEquals(null, scheduler.getCallCostSnapshot().get("A"));
+ assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue());
scheduler.forceDecay();
assertEquals(0, scheduler.getTotalCallSnapshot());
- assertEquals(null, scheduler.getCallCountSnapshot().get("A"));
- assertEquals(null, scheduler.getCallCountSnapshot().get("B"));
+ assertEquals(null, scheduler.getCallCostSnapshot().get("A"));
+ assertEquals(null, scheduler.getCallCostSnapshot().get("B"));
}
@Test
@@ -205,16 +209,16 @@ public class TestDecayRpcScheduler {
.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75");
scheduler = new DecayRpcScheduler(4, namespace, conf);
- assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
- assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
- assertEquals(0, scheduler.getPriorityLevel(mockCall("B")));
- assertEquals(1, scheduler.getPriorityLevel(mockCall("B")));
- assertEquals(0, scheduler.getPriorityLevel(mockCall("C")));
- assertEquals(0, scheduler.getPriorityLevel(mockCall("C")));
- assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
- assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
- assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
- assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
+ assertEquals(0, getPriorityIncrementCallCount("A")); // 0 out of 0 calls
+ assertEquals(3, getPriorityIncrementCallCount("A")); // 1 out of 1 calls
+ assertEquals(0, getPriorityIncrementCallCount("B")); // 0 out of 2 calls
+ assertEquals(1, getPriorityIncrementCallCount("B")); // 1 out of 3 calls
+ assertEquals(0, getPriorityIncrementCallCount("C")); // 0 out of 4 calls
+ assertEquals(0, getPriorityIncrementCallCount("C")); // 1 out of 5 calls
+ assertEquals(1, getPriorityIncrementCallCount("A")); // 2 out of 6 calls
+ assertEquals(1, getPriorityIncrementCallCount("A")); // 3 out of 7 calls
+ assertEquals(2, getPriorityIncrementCallCount("A")); // 4 out of 8 calls
+ assertEquals(2, getPriorityIncrementCallCount("A")); // 5 out of 9 calls
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
@@ -243,7 +247,7 @@ public class TestDecayRpcScheduler {
assertEquals(0, scheduler.getTotalCallSnapshot());
for (int i = 0; i < 64; i++) {
- scheduler.getPriorityLevel(mockCall("A"));
+ getPriorityIncrementCallCount("A");
}
// It should eventually decay to zero
@@ -272,6 +276,108 @@ public class TestDecayRpcScheduler {
//set systout back
System.setOut(output);
}
+ }
+
+ @Test
+ public void testUsingWeightedTimeCostProvider() {
+ scheduler = getSchedulerWithWeightedTimeCostProvider(3);
+
+ // 3 details in increasing order of cost. Although medium has a longer
+ // duration, the shared lock is weighted less than the exclusive lock
+ ProcessingDetails callDetailsLow =
+ new ProcessingDetails(TimeUnit.MILLISECONDS);
+ callDetailsLow.set(ProcessingDetails.Timing.LOCKFREE, 1);
+ ProcessingDetails callDetailsMedium =
+ new ProcessingDetails(TimeUnit.MILLISECONDS);
+ callDetailsMedium.set(ProcessingDetails.Timing.LOCKSHARED, 500);
+ ProcessingDetails callDetailsHigh =
+ new ProcessingDetails(TimeUnit.MILLISECONDS);
+ callDetailsHigh.set(ProcessingDetails.Timing.LOCKEXCLUSIVE, 100);
+
+ for (int i = 0; i < 10; i++) {
+ scheduler.addResponseTime("ignored", mockCall("LOW"), callDetailsLow);
+ }
+ scheduler.addResponseTime("ignored", mockCall("MED"), callDetailsMedium);
+ scheduler.addResponseTime("ignored", mockCall("HIGH"), callDetailsHigh);
+
+ assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
+ assertEquals(1, scheduler.getPriorityLevel(mockCall("MED")));
+ assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH")));
+
+ assertEquals(3, scheduler.getUniqueIdentityCount());
+ long totalCallInitial = scheduler.getTotalRawCallVolume();
+ assertEquals(totalCallInitial, scheduler.getTotalCallVolume());
+
+ scheduler.forceDecay();
+
+ // Relative priorities should stay the same after a single decay
+ assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
+ assertEquals(1, scheduler.getPriorityLevel(mockCall("MED")));
+ assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH")));
+
+ assertEquals(3, scheduler.getUniqueIdentityCount());
+ assertEquals(totalCallInitial, scheduler.getTotalRawCallVolume());
+ assertTrue(scheduler.getTotalCallVolume() < totalCallInitial);
+
+ for (int i = 0; i < 100; i++) {
+ scheduler.forceDecay();
+ }
+ // After enough decay cycles, all callers should be high priority again
+ assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
+ assertEquals(0, scheduler.getPriorityLevel(mockCall("MED")));
+ assertEquals(0, scheduler.getPriorityLevel(mockCall("HIGH")));
+ }
+
+ @Test
+ public void testUsingWeightedTimeCostProviderWithZeroCostCalls() {
+ scheduler = getSchedulerWithWeightedTimeCostProvider(2);
+
+ ProcessingDetails emptyDetails =
+ new ProcessingDetails(TimeUnit.MILLISECONDS);
+
+ for (int i = 0; i < 1000; i++) {
+ scheduler.addResponseTime("ignored", mockCall("MANY"), emptyDetails);
+ }
+ scheduler.addResponseTime("ignored", mockCall("FEW"), emptyDetails);
+
+ // Since the calls are all "free", they should have the same priority
+ assertEquals(0, scheduler.getPriorityLevel(mockCall("MANY")));
+ assertEquals(0, scheduler.getPriorityLevel(mockCall("FEW")));
+ }
+
+ @Test
+ public void testUsingWeightedTimeCostProviderNoRequests() {
+ scheduler = getSchedulerWithWeightedTimeCostProvider(2);
+
+ assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
+ }
+
+ /**
+ * Get a scheduler that uses {@link WeightedTimeCostProvider} and has
+ * normal decaying disabled.
+ */
+ private static DecayRpcScheduler getSchedulerWithWeightedTimeCostProvider(
+ int priorityLevels) {
+ Configuration conf = new Configuration();
+ conf.setClass("ns." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
+ WeightedTimeCostProvider.class, CostProvider.class);
+ conf.setLong("ns."
+ + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
+ return new DecayRpcScheduler(priorityLevels, "ns", conf);
+ }
+ /**
+ * Get the priority and increment the call count, assuming that
+ * {@link DefaultCostProvider} is in use.
+ */
+ private int getPriorityIncrementCallCount(String callId) {
+ Schedulable mockCall = mockCall(callId);
+ int priority = scheduler.getPriorityLevel(mockCall);
+ // The DefaultCostProvider uses a cost of 1 for all calls, ignoring
+ // the processing details, so an empty one is fine
+ ProcessingDetails emptyProcessingDetails =
+ new ProcessingDetails(TimeUnit.MILLISECONDS);
+ scheduler.addResponseTime("ignored", mockCall, emptyProcessingDetails);
+ return priority;
}
}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 8c7881a..95bc673 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ipc;
-import com.google.common.base.Supplier;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
@@ -1187,15 +1186,6 @@ public class TestRPC extends TestRpcBase {
Exception lastException = null;
proxy = getClient(addr, conf);
- MetricsRecordBuilder rb1 =
- getMetrics("DecayRpcSchedulerMetrics2." + ns);
- final long beginDecayedCallVolume = MetricsAsserts.getLongCounter(
- "DecayedCallVolume", rb1);
- final long beginRawCallVolume = MetricsAsserts.getLongCounter(
- "CallVolume", rb1);
- final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
- rb1);
-
try {
// start a sleep RPC call that sleeps 3s.
for (int i = 0; i < numClients; i++) {
@@ -1223,41 +1213,6 @@ public class TestRPC extends TestRpcBase {
} else {
lastException = unwrapExeption;
}
-
- // Lets Metric system update latest metrics
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- MetricsRecordBuilder rb2 =
- getMetrics("DecayRpcSchedulerMetrics2." + ns);
- long decayedCallVolume1 = MetricsAsserts.getLongCounter(
- "DecayedCallVolume", rb2);
- long rawCallVolume1 = MetricsAsserts.getLongCounter(
- "CallVolume", rb2);
- int uniqueCaller1 = MetricsAsserts.getIntCounter(
- "UniqueCallers", rb2);
- long callVolumePriority0 = MetricsAsserts.getLongGauge(
- "Priority.0.CompletedCallVolume", rb2);
- long callVolumePriority1 = MetricsAsserts.getLongGauge(
- "Priority.1.CompletedCallVolume", rb2);
- double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
- "Priority.0.AvgResponseTime", rb2);
- double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
- "Priority.1.AvgResponseTime", rb2);
-
- LOG.info("DecayedCallVolume: " + decayedCallVolume1);
- LOG.info("CallVolume: " + rawCallVolume1);
- LOG.info("UniqueCaller: " + uniqueCaller1);
- LOG.info("Priority.0.CompletedCallVolume: " + callVolumePriority0);
- LOG.info("Priority.1.CompletedCallVolume: " + callVolumePriority1);
- LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0);
- LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1);
-
- return decayedCallVolume1 > beginDecayedCallVolume &&
- rawCallVolume1 > beginRawCallVolume &&
- uniqueCaller1 > beginUniqueCaller;
- }
- }, 30, 60000);
}
} finally {
executorService.shutdown();
@@ -1269,6 +1224,63 @@ public class TestRPC extends TestRpcBase {
assertTrue("RetriableException not received", succeeded);
}
+ /** Test that the metrics for DecayRpcScheduler are updated. */
+ @Test (timeout=30000)
+ public void testDecayRpcSchedulerMetrics() throws Exception {
+ final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+ Server server = setupDecayRpcSchedulerandTestServer(ns + ".");
+
+ MetricsRecordBuilder rb1 =
+ getMetrics("DecayRpcSchedulerMetrics2." + ns);
+ final long beginDecayedCallVolume = MetricsAsserts.getLongCounter(
+ "DecayedCallVolume", rb1);
+ final long beginRawCallVolume = MetricsAsserts.getLongCounter(
+ "CallVolume", rb1);
+ final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
+ rb1);
+
+ TestRpcService proxy = getClient(addr, conf);
+ try {
+ for (int i = 0; i < 2; i++) {
+ proxy.sleep(null, newSleepRequest(100));
+ }
+
+ // Lets Metric system update latest metrics
+ GenericTestUtils.waitFor(() -> {
+ MetricsRecordBuilder rb2 =
+ getMetrics("DecayRpcSchedulerMetrics2." + ns);
+ long decayedCallVolume1 = MetricsAsserts.getLongCounter(
+ "DecayedCallVolume", rb2);
+ long rawCallVolume1 = MetricsAsserts.getLongCounter(
+ "CallVolume", rb2);
+ int uniqueCaller1 = MetricsAsserts.getIntCounter(
+ "UniqueCallers", rb2);
+ long callVolumePriority0 = MetricsAsserts.getLongGauge(
+ "Priority.0.CompletedCallVolume", rb2);
+ long callVolumePriority1 = MetricsAsserts.getLongGauge(
+ "Priority.1.CompletedCallVolume", rb2);
+ double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
+ "Priority.0.AvgResponseTime", rb2);
+ double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
+ "Priority.1.AvgResponseTime", rb2);
+
+ LOG.info("DecayedCallVolume: {}", decayedCallVolume1);
+ LOG.info("CallVolume: {}", rawCallVolume1);
+ LOG.info("UniqueCaller: {}", uniqueCaller1);
+ LOG.info("Priority.0.CompletedCallVolume: {}", callVolumePriority0);
+ LOG.info("Priority.1.CompletedCallVolume: {}", callVolumePriority1);
+ LOG.info("Priority.0.AvgResponseTime: {}", avgRespTimePriority0);
+ LOG.info("Priority.1.AvgResponseTime: {}", avgRespTimePriority1);
+
+ return decayedCallVolume1 > beginDecayedCallVolume &&
+ rawCallVolume1 > beginRawCallVolume &&
+ uniqueCaller1 > beginUniqueCaller;
+ }, 30, 60000);
+ } finally {
+ stop(server, proxy);
+ }
+ }
+
private Server setupDecayRpcSchedulerandTestServer(String ns)
throws Exception {
final int queueSizePerHandler = 3;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java
new file mode 100644
index 0000000..4f4a72b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProcessingDetails.Timing;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKEXCLUSIVE_WEIGHT;
+import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKFREE_WEIGHT;
+import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKSHARED_WEIGHT;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link WeightedTimeCostProvider}. */
+public class TestWeightedTimeCostProvider {
+
+ private static final int QUEUE_TIME = 3;
+ private static final int LOCKFREE_TIME = 5;
+ private static final int LOCKSHARED_TIME = 7;
+ private static final int LOCKEXCLUSIVE_TIME = 11;
+
+ private WeightedTimeCostProvider costProvider;
+ private ProcessingDetails processingDetails;
+
+ @Before
+ public void setup() {
+ costProvider = new WeightedTimeCostProvider();
+ processingDetails = new ProcessingDetails(TimeUnit.MILLISECONDS);
+ processingDetails.set(Timing.QUEUE, QUEUE_TIME);
+ processingDetails.set(Timing.LOCKFREE, LOCKFREE_TIME);
+ processingDetails.set(Timing.LOCKSHARED, LOCKSHARED_TIME);
+ processingDetails.set(Timing.LOCKEXCLUSIVE, LOCKEXCLUSIVE_TIME);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testGetCostBeforeInit() {
+ costProvider.getCost(null);
+ }
+
+ @Test
+ public void testGetCostDefaultWeights() {
+ costProvider.init("foo", new Configuration());
+ long actualCost = costProvider.getCost(processingDetails);
+ long expectedCost = DEFAULT_LOCKFREE_WEIGHT * LOCKFREE_TIME
+ + DEFAULT_LOCKSHARED_WEIGHT * LOCKSHARED_TIME
+ + DEFAULT_LOCKEXCLUSIVE_WEIGHT * LOCKEXCLUSIVE_TIME;
+ assertEquals(expectedCost, actualCost);
+ }
+
+ @Test
+ public void testGetCostConfiguredWeights() {
+ Configuration conf = new Configuration();
+ int queueWeight = 1000;
+ int lockfreeWeight = 10000;
+ int locksharedWeight = 100000;
+ conf.setInt("foo.weighted-cost.queue", queueWeight);
+ conf.setInt("foo.weighted-cost.lockfree", lockfreeWeight);
+ conf.setInt("foo.weighted-cost.lockshared", locksharedWeight);
+ conf.setInt("bar.weighted-cost.lockexclusive", 0); // should not apply
+ costProvider.init("foo", conf);
+ long actualCost = costProvider.getCost(processingDetails);
+ long expectedCost = queueWeight * QUEUE_TIME
+ + lockfreeWeight * LOCKFREE_TIME
+ + locksharedWeight * LOCKSHARED_TIME
+ + DEFAULT_LOCKEXCLUSIVE_WEIGHT * LOCKEXCLUSIVE_TIME;
+ assertEquals(expectedCost, actualCost);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org