You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2019/06/24 19:09:49 UTC

[hadoop] branch trunk updated: HDFS-14403. Cost-based extension to the RPC Fair Call Queue. Contributed by Christopher Gregorian.

This is an automated email from the ASF dual-hosted git repository.

xkrogen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 129576f  HDFS-14403. Cost-based extension to the RPC Fair Call Queue. Contributed by Christopher Gregorian.
129576f is described below

commit 129576f628d370def74e56112aba3a93e97bbf70
Author: Christopher Gregorian <cs...@gmail.com>
AuthorDate: Fri May 24 17:09:52 2019 -0700

    HDFS-14403. Cost-based extension to the RPC Fair Call Queue. Contributed by Christopher Gregorian.
---
 .../apache/hadoop/fs/CommonConfigurationKeys.java  |   1 +
 .../org/apache/hadoop/ipc/CallQueueManager.java    |   1 -
 .../java/org/apache/hadoop/ipc/CostProvider.java   |  46 ++++
 .../org/apache/hadoop/ipc/DecayRpcScheduler.java   | 249 ++++++++++++---------
 .../org/apache/hadoop/ipc/DefaultCostProvider.java |  43 ++++
 .../hadoop/ipc/WeightedTimeCostProvider.java       | 110 +++++++++
 .../src/site/markdown/FairCallQueue.md             |  19 ++
 .../apache/hadoop/ipc/TestDecayRpcScheduler.java   | 174 +++++++++++---
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   | 102 +++++----
 .../hadoop/ipc/TestWeightedTimeCostProvider.java   |  86 +++++++
 10 files changed, 639 insertions(+), 192 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 958113c..876d0ad 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -106,6 +106,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
   public static final String IPC_SCHEDULER_IMPL_KEY = "scheduler.impl";
   public static final String IPC_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
+  public static final String IPC_COST_PROVIDER_KEY = "cost-provider.impl";
   public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
   public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index e18f307..0287656 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -198,7 +198,6 @@ public class CallQueueManager<E extends Schedulable>
   }
 
   // This should be only called once per call and cached in the call object
-  // each getPriorityLevel call will increment the counter for the caller
   int getPriorityLevel(Schedulable e) {
     return scheduler.getPriorityLevel(e);
   }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java
new file mode 100644
index 0000000..cf76e7d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used by {@link DecayRpcScheduler} to get the cost of users' operations. This
+ * is configurable using
+ * {@link org.apache.hadoop.fs.CommonConfigurationKeys#IPC_COST_PROVIDER_KEY}.
+ */
+public interface CostProvider {
+
+  /**
+   * Initialize this provider using the given configuration, examining only
+   * ones which fall within the provided namespace.
+   *
+   * @param namespace The namespace to use when looking up configurations.
+   * @param conf The configuration
+   */
+  void init(String namespace, Configuration conf);
+
+  /**
+   * Get cost from {@link ProcessingDetails} which will be used in scheduler.
+   *
+   * @param details Process details
+   * @return The cost of the call
+   */
+  long getCost(ProcessingDetails details);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index 38218b2..ffeafb5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -58,8 +58,8 @@ import org.slf4j.LoggerFactory;
 import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
 
 /**
- * The decay RPC scheduler counts incoming requests in a map, then
- * decays the counts at a fixed time interval. The scheduler is optimized
+ * The decay RPC scheduler tracks the cost of incoming requests in a map, then
+ * decays the costs at a fixed time interval. The scheduler is optimized
  * for large periods (on the order of seconds), as it offloads work to the
  * decay sweep.
  */
@@ -77,7 +77,7 @@ public class DecayRpcScheduler implements RpcScheduler,
     "faircallqueue.decay-scheduler.period-ms";
 
   /**
-   * Decay factor controls how much each count is suppressed by on each sweep.
+   * Decay factor controls how much each cost is suppressed by on each sweep.
    * Valid numbers are &gt; 0 and &lt; 1. Decay factor works in tandem with
    * period
    * to control how long the scheduler remembers an identity.
@@ -135,15 +135,15 @@ public class DecayRpcScheduler implements RpcScheduler,
   private static final ObjectWriter WRITER = new ObjectMapper().writer();
 
   // Track the decayed and raw (no decay) number of calls for each schedulable
-  // identity from all previous decay windows: idx 0 for decayed call count and
-  // idx 1 for the raw call count
-  private final ConcurrentHashMap<Object, List<AtomicLong>> callCounts =
+  // identity from all previous decay windows: idx 0 for decayed call cost and
+  // idx 1 for the raw call cost
+  private final ConcurrentHashMap<Object, List<AtomicLong>> callCosts =
       new ConcurrentHashMap<Object, List<AtomicLong>>();
 
-  // Should be the sum of all AtomicLongs in decayed callCounts
-  private final AtomicLong totalDecayedCallCount = new AtomicLong();
-  // The sum of all AtomicLongs in raw callCounts
-  private final AtomicLong totalRawCallCount = new AtomicLong();
+  // Should be the sum of all AtomicLongs in decayed callCosts
+  private final AtomicLong totalDecayedCallCost = new AtomicLong();
+  // The sum of all AtomicLongs in raw callCosts
+  private final AtomicLong totalRawCallCost = new AtomicLong();
 
 
   // Track total call count and response time in current decay window
@@ -161,7 +161,7 @@ public class DecayRpcScheduler implements RpcScheduler,
 
   // Tune the behavior of the scheduler
   private final long decayPeriodMillis; // How long between each tick
-  private final double decayFactor; // nextCount = currentCount * decayFactor
+  private final double decayFactor; // nextCost = currentCost * decayFactor
   private final int numLevels;
   private final double[] thresholds;
   private final IdentityProvider identityProvider;
@@ -171,9 +171,10 @@ public class DecayRpcScheduler implements RpcScheduler,
   private final int topUsersCount; // e.g., report top 10 users' metrics
   private static final double PRECISION = 0.0001;
   private MetricsProxy metricsProxy;
+  private final CostProvider costProvider;
 
   /**
-   * This TimerTask will call decayCurrentCounts until
+   * This TimerTask will call decayCurrentCosts until
    * the scheduler has been garbage collected.
    */
   public static class DecayTask extends TimerTask {
@@ -189,7 +190,7 @@ public class DecayRpcScheduler implements RpcScheduler,
     public void run() {
       DecayRpcScheduler sched = schedulerRef.get();
       if (sched != null) {
-        sched.decayCurrentCounts();
+        sched.decayCurrentCosts();
       } else {
         // Our scheduler was garbage collected since it is no longer in use,
         // so we should terminate the timer as well
@@ -216,6 +217,7 @@ public class DecayRpcScheduler implements RpcScheduler,
     this.decayFactor = parseDecayFactor(ns, conf);
     this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
     this.identityProvider = this.parseIdentityProvider(ns, conf);
+    this.costProvider = this.parseCostProvider(ns, conf);
     this.thresholds = parseThresholds(ns, conf, numLevels);
     this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns,
         conf);
@@ -243,6 +245,24 @@ public class DecayRpcScheduler implements RpcScheduler,
     recomputeScheduleCache();
   }
 
+  private CostProvider parseCostProvider(String ns, Configuration conf) {
+    List<CostProvider> providers = conf.getInstances(
+        ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
+        CostProvider.class);
+
+    if (providers.size() < 1) {
+      LOG.info("CostProvider not specified, defaulting to DefaultCostProvider");
+      return new DefaultCostProvider();
+    } else if (providers.size() > 1) {
+      LOG.warn("Found multiple CostProviders; using: {}",
+          providers.get(0).getClass());
+    }
+
+    CostProvider provider = providers.get(0); // use the first
+    provider.init(ns, conf);
+    return provider;
+  }
+
   // Load configs
   private IdentityProvider parseIdentityProvider(String ns,
       Configuration conf) {
@@ -389,69 +409,69 @@ public class DecayRpcScheduler implements RpcScheduler,
   }
 
   /**
-   * Decay the stored counts for each user and clean as necessary.
+   * Decay the stored costs for each user and clean as necessary.
    * This method should be called periodically in order to keep
-   * counts current.
+   * costs current.
    */
-  private void decayCurrentCounts() {
-    LOG.debug("Start to decay current counts.");
+  private void decayCurrentCosts() {
+    LOG.debug("Start to decay current costs.");
     try {
-      long totalDecayedCount = 0;
-      long totalRawCount = 0;
+      long totalDecayedCost = 0;
+      long totalRawCost = 0;
       Iterator<Map.Entry<Object, List<AtomicLong>>> it =
-          callCounts.entrySet().iterator();
+          callCosts.entrySet().iterator();
 
       while (it.hasNext()) {
         Map.Entry<Object, List<AtomicLong>> entry = it.next();
-        AtomicLong decayedCount = entry.getValue().get(0);
-        AtomicLong rawCount = entry.getValue().get(1);
+        AtomicLong decayedCost = entry.getValue().get(0);
+        AtomicLong rawCost = entry.getValue().get(1);
 
 
         // Compute the next value by reducing it by the decayFactor
-        totalRawCount += rawCount.get();
-        long currentValue = decayedCount.get();
+        totalRawCost += rawCost.get();
+        long currentValue = decayedCost.get();
         long nextValue = (long) (currentValue * decayFactor);
-        totalDecayedCount += nextValue;
-        decayedCount.set(nextValue);
+        totalDecayedCost += nextValue;
+        decayedCost.set(nextValue);
 
-        LOG.debug("Decaying counts for the user: {}, " +
-            "its decayedCount: {}, rawCount: {}", entry.getKey(),
-            nextValue, rawCount.get());
+        LOG.debug(
+            "Decaying costs for the user: {}, its decayedCost: {}, rawCost: {}",
+            entry.getKey(), nextValue, rawCost.get());
         if (nextValue == 0) {
-          LOG.debug("The decayed count for the user {} is zero " +
+          LOG.debug("The decayed cost for the user {} is zero " +
               "and being cleaned.", entry.getKey());
           // We will clean up unused keys here. An interesting optimization
-          // might be to have an upper bound on keyspace in callCounts and only
+          // might be to have an upper bound on keyspace in callCosts and only
           // clean once we pass it.
           it.remove();
         }
       }
 
       // Update the total so that we remain in sync
-      totalDecayedCallCount.set(totalDecayedCount);
-      totalRawCallCount.set(totalRawCount);
+      totalDecayedCallCost.set(totalDecayedCost);
+      totalRawCallCost.set(totalRawCost);
 
-      LOG.debug("After decaying the stored counts, totalDecayedCount: {}, " +
-          "totalRawCallCount: {}.", totalDecayedCount, totalRawCount);
+      LOG.debug("After decaying the stored costs, totalDecayedCost: {}, " +
+          "totalRawCallCost: {}.", totalDecayedCost, totalRawCost);
       // Now refresh the cache of scheduling decisions
       recomputeScheduleCache();
 
       // Update average response time with decay
       updateAverageResponseTime(true);
     } catch (Exception ex) {
-      LOG.error("decayCurrentCounts exception: " +
+      LOG.error("decayCurrentCosts exception: " +
           ExceptionUtils.getStackTrace(ex));
       throw ex;
     }
   }
 
   /**
-   * Update the scheduleCache to match current conditions in callCounts.
+   * Update the scheduleCache to match current conditions in callCosts.
    */
   private void recomputeScheduleCache() {
     Map<Object, Integer> nextCache = new HashMap<Object, Integer>();
 
-    for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
+    for (Map.Entry<Object, List<AtomicLong>> entry : callCosts.entrySet()) {
       Object id = entry.getKey();
       AtomicLong value = entry.getValue().get(0);
 
@@ -466,51 +486,52 @@ public class DecayRpcScheduler implements RpcScheduler,
   }
 
   /**
-   * Get the number of occurrences and increment atomically.
-   * @param identity the identity of the user to increment
-   * @return the value before incrementation
+   * Adjust the stored cost for a given identity.
+   *
+   * @param identity the identity of the user whose cost should be adjusted
+   * @param costDelta the cost to add for the given identity
    */
-  private long getAndIncrementCallCounts(Object identity)
-      throws InterruptedException {
-    // We will increment the count, or create it if no such count exists
-    List<AtomicLong> count = this.callCounts.get(identity);
-    if (count == null) {
-      // Create the counts since no such count exists.
-      // idx 0 for decayed call count
-      // idx 1 for the raw call count
-      count = new ArrayList<AtomicLong>(2);
-      count.add(new AtomicLong(0));
-      count.add(new AtomicLong(0));
+  private void addCost(Object identity, long costDelta) {
+    // We will increment the cost, or create it if no such cost exists
+    List<AtomicLong> cost = this.callCosts.get(identity);
+    if (cost == null) {
+      // Create the costs since no such cost exists.
+      // idx 0 for decayed call cost
+      // idx 1 for the raw call cost
+      cost = new ArrayList<AtomicLong>(2);
+      cost.add(new AtomicLong(0));
+      cost.add(new AtomicLong(0));
 
       // Put it in, or get the AtomicInteger that was put in by another thread
-      List<AtomicLong> otherCount = callCounts.putIfAbsent(identity, count);
-      if (otherCount != null) {
-        count = otherCount;
+      List<AtomicLong> otherCost = callCosts.putIfAbsent(identity, cost);
+      if (otherCost != null) {
+        cost = otherCost;
       }
     }
 
     // Update the total
-    totalDecayedCallCount.getAndIncrement();
-    totalRawCallCount.getAndIncrement();
+    totalDecayedCallCost.getAndAdd(costDelta);
+    totalRawCallCost.getAndAdd(costDelta);
 
     // At this point value is guaranteed to be not null. It may however have
-    // been clobbered from callCounts. Nonetheless, we return what
+    // been clobbered from callCosts. Nonetheless, we return what
     // we have.
-    count.get(1).getAndIncrement();
-    return count.get(0).getAndIncrement();
+    cost.get(1).getAndAdd(costDelta);
+    cost.get(0).getAndAdd(costDelta);
   }
 
   /**
-   * Given the number of occurrences, compute a scheduling decision.
-   * @param occurrences how many occurrences
+   * Given the cost for an identity, compute a scheduling decision.
+   *
+   * @param cost the cost for an identity
    * @return scheduling decision from 0 to numLevels - 1
    */
-  private int computePriorityLevel(long occurrences) {
-    long totalCallSnapshot = totalDecayedCallCount.get();
+  private int computePriorityLevel(long cost) {
+    long totalCallSnapshot = totalDecayedCallCost.get();
 
     double proportion = 0;
     if (totalCallSnapshot > 0) {
-      proportion = (double) occurrences / totalCallSnapshot;
+      proportion = (double) cost / totalCallSnapshot;
     }
 
     // Start with low priority levels, since they will be most common
@@ -531,31 +552,23 @@ public class DecayRpcScheduler implements RpcScheduler,
    * @return integer scheduling decision from 0 to numLevels - 1
    */
   private int cachedOrComputedPriorityLevel(Object identity) {
-    try {
-      long occurrences = this.getAndIncrementCallCounts(identity);
-
-      // Try the cache
-      Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
-      if (scheduleCache != null) {
-        Integer priority = scheduleCache.get(identity);
-        if (priority != null) {
-          LOG.debug("Cache priority for: {} with priority: {}", identity,
-              priority);
-          return priority;
-        }
+    // Try the cache
+    Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
+    if (scheduleCache != null) {
+      Integer priority = scheduleCache.get(identity);
+      if (priority != null) {
+        LOG.debug("Cache priority for: {} with priority: {}", identity,
+            priority);
+        return priority;
       }
-
-      // Cache was no good, compute it
-      int priority = computePriorityLevel(occurrences);
-      LOG.debug("compute priority for " + identity + " priority " + priority);
-      return priority;
-
-    } catch (InterruptedException ie) {
-      LOG.warn("Caught InterruptedException, returning low priority level");
-      LOG.debug("Fallback priority for: {} with priority: {}", identity,
-          numLevels - 1);
-      return numLevels - 1;
     }
+
+    // Cache was no good, compute it
+    List<AtomicLong> costList = callCosts.get(identity);
+    long currentCost = costList == null ? 0 : costList.get(0).get();
+    int priority = computePriorityLevel(currentCost);
+    LOG.debug("compute priority for {} priority {}", identity, priority);
+    return priority;
   }
 
   /**
@@ -605,6 +618,10 @@ public class DecayRpcScheduler implements RpcScheduler,
   @Override
   public void addResponseTime(String callName, Schedulable schedulable,
       ProcessingDetails details) {
+    String user = identityProvider.makeIdentity(schedulable);
+    long processingCost = costProvider.getCost(details);
+    addCost(user, processingCost);
+
     int priorityLevel = schedulable.getPriorityLevel();
     long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
     long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
@@ -652,22 +669,30 @@ public class DecayRpcScheduler implements RpcScheduler,
 
   // For testing
   @VisibleForTesting
-  public double getDecayFactor() { return decayFactor; }
+  double getDecayFactor() {
+    return decayFactor;
+  }
 
   @VisibleForTesting
-  public long getDecayPeriodMillis() { return decayPeriodMillis; }
+  long getDecayPeriodMillis() {
+    return decayPeriodMillis;
+  }
 
   @VisibleForTesting
-  public double[] getThresholds() { return thresholds; }
+  double[] getThresholds() {
+    return thresholds;
+  }
 
   @VisibleForTesting
-  public void forceDecay() { decayCurrentCounts(); }
+  void forceDecay() {
+    decayCurrentCosts();
+  }
 
   @VisibleForTesting
-  public Map<Object, Long> getCallCountSnapshot() {
+  Map<Object, Long> getCallCostSnapshot() {
     HashMap<Object, Long> snapshot = new HashMap<Object, Long>();
 
-    for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
+    for (Map.Entry<Object, List<AtomicLong>> entry : callCosts.entrySet()) {
       snapshot.put(entry.getKey(), entry.getValue().get(0).get());
     }
 
@@ -675,8 +700,8 @@ public class DecayRpcScheduler implements RpcScheduler,
   }
 
   @VisibleForTesting
-  public long getTotalCallSnapshot() {
-    return totalDecayedCallCount.get();
+  long getTotalCallSnapshot() {
+    return totalDecayedCallCost.get();
   }
 
   /**
@@ -809,15 +834,15 @@ public class DecayRpcScheduler implements RpcScheduler,
   }
 
   public int getUniqueIdentityCount() {
-    return callCounts.size();
+    return callCosts.size();
   }
 
   public long getTotalCallVolume() {
-    return totalDecayedCallCount.get();
+    return totalDecayedCallCost.get();
   }
 
   public long getTotalRawCallVolume() {
-    return totalRawCallCount.get();
+    return totalRawCallCost.get();
   }
 
   public long[] getResponseTimeCountInLastWindow() {
@@ -910,17 +935,17 @@ public class DecayRpcScheduler implements RpcScheduler,
     }
   }
 
-  // Get the top N callers' raw call count and scheduler decision
+  // Get the top N callers' raw call cost and scheduler decision
   private TopN getTopCallers(int n) {
     TopN topNCallers = new TopN(n);
     Iterator<Map.Entry<Object, List<AtomicLong>>> it =
-        callCounts.entrySet().iterator();
+        callCosts.entrySet().iterator();
     while (it.hasNext()) {
       Map.Entry<Object, List<AtomicLong>> entry = it.next();
       String caller = entry.getKey().toString();
-      Long count = entry.getValue().get(1).get();
-      if (count > 0) {
-        topNCallers.offer(new NameValuePair(caller, count));
+      Long cost = entry.getValue().get(1).get();
+      if (cost > 0) {
+        topNCallers.offer(new NameValuePair(caller, cost));
       }
     }
     return topNCallers;
@@ -941,25 +966,25 @@ public class DecayRpcScheduler implements RpcScheduler,
 
   public String getCallVolumeSummary() {
     try {
-      return WRITER.writeValueAsString(getDecayedCallCounts());
+      return WRITER.writeValueAsString(getDecayedCallCosts());
     } catch (Exception e) {
       return "Error: " + e.getMessage();
     }
   }
 
-  private Map<Object, Long> getDecayedCallCounts() {
-    Map<Object, Long> decayedCallCounts = new HashMap<>(callCounts.size());
+  private Map<Object, Long> getDecayedCallCosts() {
+    Map<Object, Long> decayedCallCosts = new HashMap<>(callCosts.size());
     Iterator<Map.Entry<Object, List<AtomicLong>>> it =
-        callCounts.entrySet().iterator();
+        callCosts.entrySet().iterator();
     while (it.hasNext()) {
       Map.Entry<Object, List<AtomicLong>> entry = it.next();
       Object user = entry.getKey();
-      Long decayedCount = entry.getValue().get(0).get();
-      if (decayedCount > 0) {
-        decayedCallCounts.put(user, decayedCount);
+      Long decayedCost = entry.getValue().get(0).get();
+      if (decayedCost > 0) {
+        decayedCallCosts.put(user, decayedCost);
       }
     }
-    return decayedCallCounts;
+    return decayedCallCosts;
   }
 
   @Override
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java
new file mode 100644
index 0000000..ad56ddf
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Ignores process details and returns a constant value for each call.
+ */
+public class DefaultCostProvider implements CostProvider {
+
+  @Override
+  public void init(String namespace, Configuration conf) {
+    // No-op
+  }
+
+  /**
+   * Returns 1, regardless of the processing details.
+   *
+   * @param details Process details (ignored)
+   * @return 1
+   */
+  @Override
+  public long getCost(ProcessingDetails details) {
+    return 1;
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java
new file mode 100644
index 0000000..4304b24
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.util.Locale;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
+
+/**
+ * A {@link CostProvider} that calculates the cost for an operation
+ * as a weighted sum of its processing time values (see
+ * {@link ProcessingDetails}). This can be used by specifying the
+ * {@link org.apache.hadoop.fs.CommonConfigurationKeys#IPC_COST_PROVIDER_KEY}
+ * configuration key.
+ *
+ * <p/>This allows for configuration of how heavily each of the operations
+ * within {@link ProcessingDetails} is weighted. By default,
+ * {@link ProcessingDetails.Timing#LOCKFREE},
+ * {@link ProcessingDetails.Timing#RESPONSE}, and
+ * {@link ProcessingDetails.Timing#HANDLER} times have a weight of
+ * {@value #DEFAULT_LOCKFREE_WEIGHT},
+ * {@link ProcessingDetails.Timing#LOCKSHARED} has a weight of
+ * {@value #DEFAULT_LOCKSHARED_WEIGHT},
+ * {@link ProcessingDetails.Timing#LOCKEXCLUSIVE} has a weight of
+ * {@value #DEFAULT_LOCKEXCLUSIVE_WEIGHT}, and others are ignored.
+ * These values can all be configured using the {@link #WEIGHT_CONFIG_PREFIX}
+ * key, prefixed with the IPC namespace, and suffixed with the name of the
+ * timing measurement from {@link ProcessingDetails} (all lowercase).
+ * For example, to set the lock exclusive weight to be 1000, set:
+ * <pre>
+ *   ipc.8020.cost-provider.impl=org.apache.hadoop.ipc.WeightedTimeCostProvider
+ *   ipc.8020.weighted-cost.lockexclusive=1000
+ * </pre>
+ */
+public class WeightedTimeCostProvider implements CostProvider {
+
+  /**
+   * The prefix used in configuration values specifying the weight to use when
+   * determining the cost of an operation. See the class Javadoc for more info.
+   */
+  public static final String WEIGHT_CONFIG_PREFIX = ".weighted-cost.";
+  static final int DEFAULT_LOCKFREE_WEIGHT = 1;
+  static final int DEFAULT_LOCKSHARED_WEIGHT = 10;
+  static final int DEFAULT_LOCKEXCLUSIVE_WEIGHT = 100;
+
+  private long[] weights;
+
+  @Override
+  public void init(String namespace, Configuration conf) {
+    weights = new long[Timing.values().length];
+    for (Timing timing : ProcessingDetails.Timing.values()) {
+      final int defaultValue;
+      switch (timing) {
+      case LOCKFREE:
+      case RESPONSE:
+      case HANDLER:
+        defaultValue = DEFAULT_LOCKFREE_WEIGHT;
+        break;
+      case LOCKSHARED:
+        defaultValue = DEFAULT_LOCKSHARED_WEIGHT;
+        break;
+      case LOCKEXCLUSIVE:
+        defaultValue = DEFAULT_LOCKEXCLUSIVE_WEIGHT;
+        break;
+      default:
+        // by default don't bill for queueing or lock wait time
+        defaultValue = 0;
+      }
+      String key = namespace + WEIGHT_CONFIG_PREFIX
+          + timing.name().toLowerCase(Locale.ENGLISH);
+      weights[timing.ordinal()] = conf.getInt(key, defaultValue);
+    }
+  }
+
+  /**
+   * Calculates a weighted sum of the times stored on the provided processing
+   * details to be used as the cost in {@link DecayRpcScheduler}.
+   *
+   * @param details Processing details
+   * @return The weighted sum of the times. The returned unit is the same
+   *         as the default unit used by the provided processing details.
+   */
+  @Override
+  public long getCost(ProcessingDetails details) {
+    assert weights != null : "Cost provider must be initialized before use";
+    long cost = 0;
+    // weights was initialized to the same length as Timing.values()
+    for (int i = 0; i < Timing.values().length; i++) {
+      cost += details.get(Timing.values()[i]) * weights[i];
+    }
+    return cost;
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md b/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md
index e62c7ad..22ac05a 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md
@@ -91,6 +91,21 @@ This is configurable via the **identity provider**, which defaults to the **User
 provider simply uses the username of the client submitting the request. However, a custom identity provider can be used
 to performing throttling based on other groupings, or using an external identity provider.
 
+### Cost-based Fair Call Queue
+
+Though the fair call queue itself does a good job of mitigating the impact from users who submit a very high _number_
+of requests, it does not take account into how expensive each request is to process. Thus, when considering the
+HDFS NameNode, a user who submits 1000 "getFileInfo" requests would be prioritized the same as a user who submits 1000
+"listStatus" requests on some very large directory, or a user who submits 1000 "mkdir" requests, which are more
+expensive as they require an exclusive lock on the namesystem. To account for the _cost_ of an operation when
+considering the prioritization of user requests, there is a "cost-based" extension to the Fair Call Queue which uses
+the aggregate processing time of a user's operations to determine how that user should be prioritized. By default,
+queue time (time spent waiting to be processed) and lock wait time (time spent waiting to acquire a lock) is not
+considered in the cost, time spent processing without a lock is neutrally (1x) weighted, time spent processing with a
+shared lock is weighted 10x higher, and time spent processing with an exclusive lock is weighted 100x higher.
+This attempts to prioritize users based on the actual load they place on the server. To enable this feature, set the
+`costprovder.impl` configuration to `org.apache.hadoop.ipc.WeightedTimeCostProvider` as described below.
+
 Configuration
 -------------
 
@@ -115,12 +130,16 @@ omitted.
 | scheduler.priority.levels | RpcScheduler, CallQueue | How many priority levels to use within the scheduler and call queue. | 4 |
 | faircallqueue.multiplexer.weights | WeightedRoundRobinMultiplexer | How much weight to give to each priority queue. This should be a comma-separated list of length equal to the number of priority levels. | Weights descend by a factor of 2 (e.g., for 4 levels: `8,4,2,1`) |
 | identity-provider.impl | DecayRpcScheduler | The identity provider mapping user requests to their identity. | org.apache.hadoop.ipc.UserIdentityProvider |
+| cost-provider.impl | DecayRpcScheduler | The cost provider mapping user requests to their cost. To enable determination of cost based on processing time, use `org.apache.hadoop.ipc.WeightedTimeCostProvider`. | org.apache.hadoop.ipc.DefaultCostProvider |
 | decay-scheduler.period-ms | DecayRpcScheduler | How frequently the decay factor should be applied to the operation counts of users. Higher values have less overhead, but respond less quickly to changes in client behavior. | 5000 |
 | decay-scheduler.decay-factor | DecayRpcScheduler | When decaying the operation counts of users, the multiplicative decay factor to apply. Higher values will weight older operations more strongly, essentially giving the scheduler a longer memory, and penalizing heavy clients for a longer period of time. | 0.5 |
 | decay-scheduler.thresholds | DecayRpcScheduler | The client load threshold, as an integer percentage, for each priority queue. Clients producing less load, as a percent of total operations, than specified at position _i_ will be given priority _i_. This should be a comma-separated list of length equal to the number of priority levels minus 1 (the last is implicitly 100). | Thresholds ascend by a factor of 2 (e.g., for 4 levels: `13,25,50`) |
 | decay-scheduler.backoff.responsetime.enable | DecayRpcScheduler | Whether or not to enable the backoff by response time feature. | false |
 | decay-scheduler.backoff.responsetime.thresholds | DecayRpcScheduler | The response time thresholds, as time durations, for each priority queue. If the average response time for a queue is above this threshold, backoff will occur in lower priority queues. This should be a comma-separated list of length equal to the number of priority levels. | Threshold increases by 10s per level (e.g., for 4 levels: `10s,20s,30s,40s`) |
 | decay-scheduler.metrics.top.user.count | DecayRpcScheduler | The number of top (i.e., heaviest) users to emit metric information about. | 10 |
+| weighted-cost.lockshared | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds a shared (read) lock. | 10 |
+| weighted-cost.lockexclusive | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds an exclusive (write) lock. | 100 |
+| weighted-cost.{handler,lockfree,response} | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phases which do not involve holding a lock. See `org.apache.hadoop.ipc.ProcessingDetails.Timing` for more details on each phase. | 1 |
 
 ### Example Configuration
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
index 10ab40a..7bdc6b5 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@ import javax.management.ObjectName;
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
 
 public class TestDecayRpcScheduler {
   private Schedulable mockCall(String id) {
@@ -131,67 +133,69 @@ public class TestDecayRpcScheduler {
     conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
     scheduler = new DecayRpcScheduler(1, "ns", conf);
 
-    assertEquals(0, scheduler.getCallCountSnapshot().size()); // empty first
+    assertEquals(0, scheduler.getCallCostSnapshot().size()); // empty first
 
-    scheduler.getPriorityLevel(mockCall("A"));
-    assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
-    assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
+    getPriorityIncrementCallCount("A");
+    assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
+    assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
 
-    scheduler.getPriorityLevel(mockCall("A"));
-    scheduler.getPriorityLevel(mockCall("B"));
-    scheduler.getPriorityLevel(mockCall("A"));
+    getPriorityIncrementCallCount("A");
+    getPriorityIncrementCallCount("B");
+    getPriorityIncrementCallCount("A");
 
-    assertEquals(3, scheduler.getCallCountSnapshot().get("A").longValue());
-    assertEquals(1, scheduler.getCallCountSnapshot().get("B").longValue());
+    assertEquals(3, scheduler.getCallCostSnapshot().get("A").longValue());
+    assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue());
   }
 
   @Test
   @SuppressWarnings("deprecation")
   public void testDecay() throws Exception {
     Configuration conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never
-    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
+    conf.setLong("ns." // Never decay
+        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999999);
+    conf.setDouble("ns."
+        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY, 0.5);
     scheduler = new DecayRpcScheduler(1, "ns", conf);
 
     assertEquals(0, scheduler.getTotalCallSnapshot());
 
     for (int i = 0; i < 4; i++) {
-      scheduler.getPriorityLevel(mockCall("A"));
+      getPriorityIncrementCallCount("A");
     }
 
     sleep(1000);
 
     for (int i = 0; i < 8; i++) {
-      scheduler.getPriorityLevel(mockCall("B"));
+      getPriorityIncrementCallCount("B");
     }
 
     assertEquals(12, scheduler.getTotalCallSnapshot());
-    assertEquals(4, scheduler.getCallCountSnapshot().get("A").longValue());
-    assertEquals(8, scheduler.getCallCountSnapshot().get("B").longValue());
+    assertEquals(4, scheduler.getCallCostSnapshot().get("A").longValue());
+    assertEquals(8, scheduler.getCallCostSnapshot().get("B").longValue());
 
     scheduler.forceDecay();
 
     assertEquals(6, scheduler.getTotalCallSnapshot());
-    assertEquals(2, scheduler.getCallCountSnapshot().get("A").longValue());
-    assertEquals(4, scheduler.getCallCountSnapshot().get("B").longValue());
+    assertEquals(2, scheduler.getCallCostSnapshot().get("A").longValue());
+    assertEquals(4, scheduler.getCallCostSnapshot().get("B").longValue());
 
     scheduler.forceDecay();
 
     assertEquals(3, scheduler.getTotalCallSnapshot());
-    assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
-    assertEquals(2, scheduler.getCallCountSnapshot().get("B").longValue());
+    assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
+    assertEquals(2, scheduler.getCallCostSnapshot().get("B").longValue());
 
     scheduler.forceDecay();
 
     assertEquals(1, scheduler.getTotalCallSnapshot());
-    assertEquals(null, scheduler.getCallCountSnapshot().get("A"));
-    assertEquals(1, scheduler.getCallCountSnapshot().get("B").longValue());
+    assertEquals(null, scheduler.getCallCostSnapshot().get("A"));
+    assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue());
 
     scheduler.forceDecay();
 
     assertEquals(0, scheduler.getTotalCallSnapshot());
-    assertEquals(null, scheduler.getCallCountSnapshot().get("A"));
-    assertEquals(null, scheduler.getCallCountSnapshot().get("B"));
+    assertEquals(null, scheduler.getCallCostSnapshot().get("A"));
+    assertEquals(null, scheduler.getCallCostSnapshot().get("B"));
   }
 
   @Test
@@ -205,16 +209,16 @@ public class TestDecayRpcScheduler {
         .IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75");
     scheduler = new DecayRpcScheduler(4, namespace, conf);
 
-    assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
-    assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
-    assertEquals(0, scheduler.getPriorityLevel(mockCall("B")));
-    assertEquals(1, scheduler.getPriorityLevel(mockCall("B")));
-    assertEquals(0, scheduler.getPriorityLevel(mockCall("C")));
-    assertEquals(0, scheduler.getPriorityLevel(mockCall("C")));
-    assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
-    assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
-    assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
-    assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
+    assertEquals(0, getPriorityIncrementCallCount("A")); // 0 out of 0 calls
+    assertEquals(3, getPriorityIncrementCallCount("A")); // 1 out of 1 calls
+    assertEquals(0, getPriorityIncrementCallCount("B")); // 0 out of 2 calls
+    assertEquals(1, getPriorityIncrementCallCount("B")); // 1 out of 3 calls
+    assertEquals(0, getPriorityIncrementCallCount("C")); // 0 out of 4 calls
+    assertEquals(0, getPriorityIncrementCallCount("C")); // 1 out of 5 calls
+    assertEquals(1, getPriorityIncrementCallCount("A")); // 2 out of 6 calls
+    assertEquals(1, getPriorityIncrementCallCount("A")); // 3 out of 7 calls
+    assertEquals(2, getPriorityIncrementCallCount("A")); // 4 out of 8 calls
+    assertEquals(2, getPriorityIncrementCallCount("A")); // 5 out of 9 calls
 
     MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
     ObjectName mxbeanName = new ObjectName(
@@ -243,7 +247,7 @@ public class TestDecayRpcScheduler {
     assertEquals(0, scheduler.getTotalCallSnapshot());
 
     for (int i = 0; i < 64; i++) {
-      scheduler.getPriorityLevel(mockCall("A"));
+      getPriorityIncrementCallCount("A");
     }
 
     // It should eventually decay to zero
@@ -272,6 +276,108 @@ public class TestDecayRpcScheduler {
       //set systout back
       System.setOut(output);
     }
+  }
+
+  @Test
+  public void testUsingWeightedTimeCostProvider() {
+    scheduler = getSchedulerWithWeightedTimeCostProvider(3);
+
+    // 3 details in increasing order of cost. Although medium has a longer
+    // duration, the shared lock is weighted less than the exclusive lock
+    ProcessingDetails callDetailsLow =
+        new ProcessingDetails(TimeUnit.MILLISECONDS);
+    callDetailsLow.set(ProcessingDetails.Timing.LOCKFREE, 1);
+    ProcessingDetails callDetailsMedium =
+        new ProcessingDetails(TimeUnit.MILLISECONDS);
+    callDetailsMedium.set(ProcessingDetails.Timing.LOCKSHARED, 500);
+    ProcessingDetails callDetailsHigh =
+        new ProcessingDetails(TimeUnit.MILLISECONDS);
+    callDetailsHigh.set(ProcessingDetails.Timing.LOCKEXCLUSIVE, 100);
+
+    for (int i = 0; i < 10; i++) {
+      scheduler.addResponseTime("ignored", mockCall("LOW"), callDetailsLow);
+    }
+    scheduler.addResponseTime("ignored", mockCall("MED"), callDetailsMedium);
+    scheduler.addResponseTime("ignored", mockCall("HIGH"), callDetailsHigh);
+
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
+    assertEquals(1, scheduler.getPriorityLevel(mockCall("MED")));
+    assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH")));
+
+    assertEquals(3, scheduler.getUniqueIdentityCount());
+    long totalCallInitial = scheduler.getTotalRawCallVolume();
+    assertEquals(totalCallInitial, scheduler.getTotalCallVolume());
+
+    scheduler.forceDecay();
+
+    // Relative priorities should stay the same after a single decay
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
+    assertEquals(1, scheduler.getPriorityLevel(mockCall("MED")));
+    assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH")));
+
+    assertEquals(3, scheduler.getUniqueIdentityCount());
+    assertEquals(totalCallInitial, scheduler.getTotalRawCallVolume());
+    assertTrue(scheduler.getTotalCallVolume() < totalCallInitial);
+
+    for (int i = 0; i < 100; i++) {
+      scheduler.forceDecay();
+    }
+    // After enough decay cycles, all callers should be high priority again
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("MED")));
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("HIGH")));
+  }
+
+  @Test
+  public void testUsingWeightedTimeCostProviderWithZeroCostCalls() {
+    scheduler = getSchedulerWithWeightedTimeCostProvider(2);
+
+    ProcessingDetails emptyDetails =
+        new ProcessingDetails(TimeUnit.MILLISECONDS);
+
+    for (int i = 0; i < 1000; i++) {
+      scheduler.addResponseTime("ignored", mockCall("MANY"), emptyDetails);
+    }
+    scheduler.addResponseTime("ignored", mockCall("FEW"), emptyDetails);
+
+    // Since the calls are all "free", they should have the same priority
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("MANY")));
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("FEW")));
+  }
+
+  @Test
+  public void testUsingWeightedTimeCostProviderNoRequests() {
+    scheduler = getSchedulerWithWeightedTimeCostProvider(2);
+
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
+  }
+
+  /**
+   * Get a scheduler that uses {@link WeightedTimeCostProvider} and has
+   * normal decaying disabled.
+   */
+  private static DecayRpcScheduler getSchedulerWithWeightedTimeCostProvider(
+      int priorityLevels) {
+    Configuration conf = new Configuration();
+    conf.setClass("ns." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
+        WeightedTimeCostProvider.class, CostProvider.class);
+    conf.setLong("ns."
+        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
+    return new DecayRpcScheduler(priorityLevels, "ns", conf);
+  }
 
+  /**
+   * Get the priority and increment the call count, assuming that
+   * {@link DefaultCostProvider} is in use.
+   */
+  private int getPriorityIncrementCallCount(String callId) {
+    Schedulable mockCall = mockCall(callId);
+    int priority = scheduler.getPriorityLevel(mockCall);
+    // The DefaultCostProvider uses a cost of 1 for all calls, ignoring
+    // the processing details, so an empty one is fine
+    ProcessingDetails emptyProcessingDetails =
+        new ProcessingDetails(TimeUnit.MILLISECONDS);
+    scheduler.addResponseTime("ignored", mockCall, emptyProcessingDetails);
+    return priority;
   }
 }
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index d58cc12..232481a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ipc;
 
-import com.google.common.base.Supplier;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
@@ -1195,15 +1194,6 @@ public class TestRPC extends TestRpcBase {
     Exception lastException = null;
     proxy = getClient(addr, conf);
 
-    MetricsRecordBuilder rb1 =
-        getMetrics("DecayRpcSchedulerMetrics2." + ns);
-    final long beginDecayedCallVolume = MetricsAsserts.getLongCounter(
-        "DecayedCallVolume", rb1);
-    final long beginRawCallVolume = MetricsAsserts.getLongCounter(
-        "CallVolume", rb1);
-    final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
-        rb1);
-
     try {
       // start a sleep RPC call that sleeps 3s.
       for (int i = 0; i < numClients; i++) {
@@ -1231,41 +1221,6 @@ public class TestRPC extends TestRpcBase {
         } else {
           lastException = unwrapExeption;
         }
-
-        // Lets Metric system update latest metrics
-        GenericTestUtils.waitFor(new Supplier<Boolean>() {
-          @Override
-          public Boolean get() {
-            MetricsRecordBuilder rb2 =
-              getMetrics("DecayRpcSchedulerMetrics2." + ns);
-            long decayedCallVolume1 = MetricsAsserts.getLongCounter(
-                "DecayedCallVolume", rb2);
-            long rawCallVolume1 = MetricsAsserts.getLongCounter(
-                "CallVolume", rb2);
-            int uniqueCaller1 = MetricsAsserts.getIntCounter(
-                "UniqueCallers", rb2);
-            long callVolumePriority0 = MetricsAsserts.getLongGauge(
-                "Priority.0.CompletedCallVolume", rb2);
-            long callVolumePriority1 = MetricsAsserts.getLongGauge(
-                "Priority.1.CompletedCallVolume", rb2);
-            double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
-                "Priority.0.AvgResponseTime", rb2);
-            double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
-                "Priority.1.AvgResponseTime", rb2);
-
-            LOG.info("DecayedCallVolume: " + decayedCallVolume1);
-            LOG.info("CallVolume: " + rawCallVolume1);
-            LOG.info("UniqueCaller: " + uniqueCaller1);
-            LOG.info("Priority.0.CompletedCallVolume: " + callVolumePriority0);
-            LOG.info("Priority.1.CompletedCallVolume: " + callVolumePriority1);
-            LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0);
-            LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1);
-
-            return decayedCallVolume1 > beginDecayedCallVolume &&
-                rawCallVolume1 > beginRawCallVolume &&
-                uniqueCaller1 > beginUniqueCaller;
-          }
-        }, 30, 60000);
       }
     } finally {
       executorService.shutdown();
@@ -1277,6 +1232,63 @@ public class TestRPC extends TestRpcBase {
     assertTrue("RetriableException not received", succeeded);
   }
 
+  /** Test that the metrics for DecayRpcScheduler are updated. */
+  @Test (timeout=30000)
+  public void testDecayRpcSchedulerMetrics() throws Exception {
+    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+    Server server = setupDecayRpcSchedulerandTestServer(ns + ".");
+
+    MetricsRecordBuilder rb1 =
+        getMetrics("DecayRpcSchedulerMetrics2." + ns);
+    final long beginDecayedCallVolume = MetricsAsserts.getLongCounter(
+        "DecayedCallVolume", rb1);
+    final long beginRawCallVolume = MetricsAsserts.getLongCounter(
+        "CallVolume", rb1);
+    final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
+        rb1);
+
+    TestRpcService proxy = getClient(addr, conf);
+    try {
+      for (int i = 0; i < 2; i++) {
+        proxy.sleep(null, newSleepRequest(100));
+      }
+
+      // Lets Metric system update latest metrics
+      GenericTestUtils.waitFor(() -> {
+        MetricsRecordBuilder rb2 =
+            getMetrics("DecayRpcSchedulerMetrics2." + ns);
+        long decayedCallVolume1 = MetricsAsserts.getLongCounter(
+            "DecayedCallVolume", rb2);
+        long rawCallVolume1 = MetricsAsserts.getLongCounter(
+            "CallVolume", rb2);
+        int uniqueCaller1 = MetricsAsserts.getIntCounter(
+            "UniqueCallers", rb2);
+        long callVolumePriority0 = MetricsAsserts.getLongGauge(
+            "Priority.0.CompletedCallVolume", rb2);
+        long callVolumePriority1 = MetricsAsserts.getLongGauge(
+            "Priority.1.CompletedCallVolume", rb2);
+        double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
+            "Priority.0.AvgResponseTime", rb2);
+        double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
+            "Priority.1.AvgResponseTime", rb2);
+
+        LOG.info("DecayedCallVolume: {}", decayedCallVolume1);
+        LOG.info("CallVolume: {}", rawCallVolume1);
+        LOG.info("UniqueCaller: {}", uniqueCaller1);
+        LOG.info("Priority.0.CompletedCallVolume: {}", callVolumePriority0);
+        LOG.info("Priority.1.CompletedCallVolume: {}", callVolumePriority1);
+        LOG.info("Priority.0.AvgResponseTime: {}", avgRespTimePriority0);
+        LOG.info("Priority.1.AvgResponseTime: {}", avgRespTimePriority1);
+
+        return decayedCallVolume1 > beginDecayedCallVolume &&
+            rawCallVolume1 > beginRawCallVolume &&
+            uniqueCaller1 > beginUniqueCaller;
+      }, 30, 60000);
+    } finally {
+      stop(server, proxy);
+    }
+  }
+
   private Server setupDecayRpcSchedulerandTestServer(String ns)
       throws Exception {
     final int queueSizePerHandler = 3;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java
new file mode 100644
index 0000000..4f4a72b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProcessingDetails.Timing;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKEXCLUSIVE_WEIGHT;
+import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKFREE_WEIGHT;
+import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKSHARED_WEIGHT;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link WeightedTimeCostProvider}. */
+public class TestWeightedTimeCostProvider {
+
+  private static final int QUEUE_TIME = 3;
+  private static final int LOCKFREE_TIME = 5;
+  private static final int LOCKSHARED_TIME = 7;
+  private static final int LOCKEXCLUSIVE_TIME = 11;
+
+  private WeightedTimeCostProvider costProvider;
+  private ProcessingDetails processingDetails;
+
+  @Before
+  public void setup() {
+    costProvider = new WeightedTimeCostProvider();
+    processingDetails = new ProcessingDetails(TimeUnit.MILLISECONDS);
+    processingDetails.set(Timing.QUEUE, QUEUE_TIME);
+    processingDetails.set(Timing.LOCKFREE, LOCKFREE_TIME);
+    processingDetails.set(Timing.LOCKSHARED, LOCKSHARED_TIME);
+    processingDetails.set(Timing.LOCKEXCLUSIVE, LOCKEXCLUSIVE_TIME);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void testGetCostBeforeInit() {
+    costProvider.getCost(null);
+  }
+
+  @Test
+  public void testGetCostDefaultWeights() {
+    costProvider.init("foo", new Configuration());
+    long actualCost = costProvider.getCost(processingDetails);
+    long expectedCost = DEFAULT_LOCKFREE_WEIGHT * LOCKFREE_TIME
+        + DEFAULT_LOCKSHARED_WEIGHT * LOCKSHARED_TIME
+        + DEFAULT_LOCKEXCLUSIVE_WEIGHT * LOCKEXCLUSIVE_TIME;
+    assertEquals(expectedCost, actualCost);
+  }
+
+  @Test
+  public void testGetCostConfiguredWeights() {
+    Configuration conf = new Configuration();
+    int queueWeight = 1000;
+    int lockfreeWeight = 10000;
+    int locksharedWeight = 100000;
+    conf.setInt("foo.weighted-cost.queue", queueWeight);
+    conf.setInt("foo.weighted-cost.lockfree", lockfreeWeight);
+    conf.setInt("foo.weighted-cost.lockshared", locksharedWeight);
+    conf.setInt("bar.weighted-cost.lockexclusive", 0); // should not apply
+    costProvider.init("foo", conf);
+    long actualCost = costProvider.getCost(processingDetails);
+    long expectedCost = queueWeight * QUEUE_TIME
+        + lockfreeWeight * LOCKFREE_TIME
+        + locksharedWeight * LOCKSHARED_TIME
+        + DEFAULT_LOCKEXCLUSIVE_WEIGHT * LOCKEXCLUSIVE_TIME;
+    assertEquals(expectedCost, actualCost);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org