You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/01 19:55:19 UTC

[18/39] hadoop git commit: HADOOP-13197. Add non-decayed call metrics for DecayRpcScheduler. Contributed by Xiaoyu Yao.

HADOOP-13197. Add non-decayed call metrics for DecayRpcScheduler. Contributed by Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ca88595
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ca88595
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ca88595

Branch: refs/heads/HDFS-1312
Commit: 4ca8859583839761663fc1fc1de1b3ce2e3fc5b5
Parents: 4fc09a8
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Mon May 23 16:59:53 2016 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri May 27 18:07:44 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/ipc/DecayRpcScheduler.java    | 130 ++++++++++++-------
 .../java/org/apache/hadoop/ipc/TestRPC.java     |  31 +++--
 2 files changed, 99 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ca88595/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index 3443d03..ec87c75 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ipc;
 
 import java.lang.ref.WeakReference;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -125,12 +126,17 @@ public class DecayRpcScheduler implements RpcScheduler,
   public static final Logger LOG =
       LoggerFactory.getLogger(DecayRpcScheduler.class);
 
-  // Track the number of calls for each schedulable identity
-  private final ConcurrentHashMap<Object, AtomicLong> callCounts =
-    new ConcurrentHashMap<Object, AtomicLong>();
+  // Track the decayed and raw (no decay) number of calls for each schedulable
+  // identity from all previous decay windows: idx 0 for decayed call count and
+  // idx 1 for the raw call count
+  private final ConcurrentHashMap<Object, List<AtomicLong>> callCounts =
+      new ConcurrentHashMap<Object, List<AtomicLong>>();
+
+  // Should be the sum of all AtomicLongs in decayed callCounts
+  private final AtomicLong totalDecayedCallCount = new AtomicLong();
+  // The sum of all AtomicLongs in raw callCounts
+  private final AtomicLong totalRawCallCount = new AtomicLong();
 
-  // Should be the sum of all AtomicLongs in callCounts
-  private final AtomicLong totalCalls = new AtomicLong();
 
   // Track total call count and response time in current decay window
   private final AtomicLongArray responseTimeCountInCurrWindow;
@@ -155,6 +161,7 @@ public class DecayRpcScheduler implements RpcScheduler,
   private final long[] backOffResponseTimeThresholds;
   private final String namespace;
   private final int topUsersCount; // e.g., report top 10 users' metrics
+  private static final double PRECISION = 0.0001;
 
   /**
    * This TimerTask will call decayCurrentCounts until
@@ -380,19 +387,23 @@ public class DecayRpcScheduler implements RpcScheduler,
    */
   private void decayCurrentCounts() {
     try {
-      long total = 0;
-      Iterator<Map.Entry<Object, AtomicLong>> it =
+      long totalDecayedCount = 0;
+      long totalRawCount = 0;
+      Iterator<Map.Entry<Object, List<AtomicLong>>> it =
           callCounts.entrySet().iterator();
 
       while (it.hasNext()) {
-        Map.Entry<Object, AtomicLong> entry = it.next();
-        AtomicLong count = entry.getValue();
+        Map.Entry<Object, List<AtomicLong>> entry = it.next();
+        AtomicLong decayedCount = entry.getValue().get(0);
+        AtomicLong rawCount = entry.getValue().get(1);
+
 
         // Compute the next value by reducing it by the decayFactor
-        long currentValue = count.get();
+        totalRawCount += rawCount.get();
+        long currentValue = decayedCount.get();
         long nextValue = (long) (currentValue * decayFactor);
-        total += nextValue;
-        count.set(nextValue);
+        totalDecayedCount += nextValue;
+        decayedCount.set(nextValue);
 
         if (nextValue == 0) {
           // We will clean up unused keys here. An interesting optimization
@@ -403,7 +414,8 @@ public class DecayRpcScheduler implements RpcScheduler,
       }
 
       // Update the total so that we remain in sync
-      totalCalls.set(total);
+      totalDecayedCallCount.set(totalDecayedCount);
+      totalRawCallCount.set(totalRawCount);
 
       // Now refresh the cache of scheduling decisions
       recomputeScheduleCache();
@@ -423,9 +435,9 @@ public class DecayRpcScheduler implements RpcScheduler,
   private void recomputeScheduleCache() {
     Map<Object, Integer> nextCache = new HashMap<Object, Integer>();
 
-    for (Map.Entry<Object, AtomicLong> entry : callCounts.entrySet()) {
+    for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
       Object id = entry.getKey();
-      AtomicLong value = entry.getValue();
+      AtomicLong value = entry.getValue().get(0);
 
       long snapshot = value.get();
       int computedLevel = computePriorityLevel(snapshot);
@@ -442,27 +454,34 @@ public class DecayRpcScheduler implements RpcScheduler,
    * @param identity the identity of the user to increment
    * @return the value before incrementation
    */
-  private long getAndIncrement(Object identity) throws InterruptedException {
+  private long getAndIncrementCallCounts(Object identity)
+      throws InterruptedException {
     // We will increment the count, or create it if no such count exists
-    AtomicLong count = this.callCounts.get(identity);
+    List<AtomicLong> count = this.callCounts.get(identity);
     if (count == null) {
-      // Create the count since no such count exists.
-      count = new AtomicLong(0);
+      // Create the counts since no such count exists.
+      // idx 0 for decayed call count
+      // idx 1 for the raw call count
+      count = new ArrayList<AtomicLong>(2);
+      count.add(new AtomicLong(0));
+      count.add(new AtomicLong(0));
 
       // Put it in, or get the AtomicInteger that was put in by another thread
-      AtomicLong otherCount = callCounts.putIfAbsent(identity, count);
+      List<AtomicLong> otherCount = callCounts.putIfAbsent(identity, count);
       if (otherCount != null) {
         count = otherCount;
       }
     }
 
     // Update the total
-    totalCalls.getAndIncrement();
+    totalDecayedCallCount.getAndIncrement();
+    totalRawCallCount.getAndIncrement();
 
     // At this point value is guaranteed to be not null. It may however have
     // been clobbered from callCounts. Nonetheless, we return what
     // we have.
-    return count.getAndIncrement();
+    count.get(1).getAndIncrement();
+    return count.get(0).getAndIncrement();
   }
 
   /**
@@ -471,7 +490,7 @@ public class DecayRpcScheduler implements RpcScheduler,
    * @return scheduling decision from 0 to numLevels - 1
    */
   private int computePriorityLevel(long occurrences) {
-    long totalCallSnapshot = totalCalls.get();
+    long totalCallSnapshot = totalDecayedCallCount.get();
 
     double proportion = 0;
     if (totalCallSnapshot > 0) {
@@ -497,7 +516,7 @@ public class DecayRpcScheduler implements RpcScheduler,
    */
   private int cachedOrComputedPriorityLevel(Object identity) {
     try {
-      long occurrences = this.getAndIncrement(identity);
+      long occurrences = this.getAndIncrementCallCounts(identity);
 
       // Try the cache
       Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
@@ -580,7 +599,7 @@ public class DecayRpcScheduler implements RpcScheduler,
     }
   }
 
-  // Update the cached average response time at the end of decay window
+  // Update the cached average response time at the end of the decay window
   void updateAverageResponseTime(boolean enableDecay) {
     for (int i = 0; i < numLevels; i++) {
       double averageResponseTime = 0;
@@ -590,11 +609,13 @@ public class DecayRpcScheduler implements RpcScheduler,
         averageResponseTime = (double) totalResponseTime / responseTimeCount;
       }
       final double lastAvg = responseTimeAvgInLastWindow.get(i);
-      if (enableDecay && lastAvg > 0.0) {
-        final double decayed = decayFactor * lastAvg + averageResponseTime;
-        responseTimeAvgInLastWindow.set(i, decayed);
-      } else {
-        responseTimeAvgInLastWindow.set(i, averageResponseTime);
+      if (lastAvg > PRECISION || averageResponseTime > PRECISION) {
+        if (enableDecay) {
+          final double decayed = decayFactor * lastAvg + averageResponseTime;
+          responseTimeAvgInLastWindow.set(i, decayed);
+        } else {
+          responseTimeAvgInLastWindow.set(i, averageResponseTime);
+        }
       }
       responseTimeCountInLastWindow.set(i, responseTimeCount);
       if (LOG.isDebugEnabled()) {
@@ -624,8 +645,8 @@ public class DecayRpcScheduler implements RpcScheduler,
   public Map<Object, Long> getCallCountSnapshot() {
     HashMap<Object, Long> snapshot = new HashMap<Object, Long>();
 
-    for (Map.Entry<Object, AtomicLong> entry : callCounts.entrySet()) {
-      snapshot.put(entry.getKey(), entry.getValue().get());
+    for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
+      snapshot.put(entry.getKey(), entry.getValue().get(0).get());
     }
 
     return Collections.unmodifiableMap(snapshot);
@@ -633,7 +654,7 @@ public class DecayRpcScheduler implements RpcScheduler,
 
   @VisibleForTesting
   public long getTotalCallSnapshot() {
-    return totalCalls.get();
+    return totalDecayedCallCount.get();
   }
 
   /**
@@ -750,7 +771,11 @@ public class DecayRpcScheduler implements RpcScheduler,
   }
 
   public long getTotalCallVolume() {
-    return totalCalls.get();
+    return totalDecayedCallCount.get();
+  }
+
+  public long getTotalRawCallVolume() {
+    return totalRawCallCount.get();
   }
 
   public long[] getResponseTimeCountInLastWindow() {
@@ -776,11 +801,12 @@ public class DecayRpcScheduler implements RpcScheduler,
     try {
       MetricsRecordBuilder rb = collector.addRecord(getClass().getName())
           .setContext(namespace);
-      addTotalCallVolume(rb);
+      addDecayedCallVolume(rb);
       addUniqueIdentityCount(rb);
       addTopNCallerSummary(rb);
       addAvgResponseTimePerPriority(rb);
       addCallVolumePerPriority(rb);
+      addRawCallVolume(rb);
     } catch (Exception e) {
       LOG.warn("Exception thrown while metric collection. Exception : "
           + e.getMessage());
@@ -793,16 +819,22 @@ public class DecayRpcScheduler implements RpcScheduler,
         getUniqueIdentityCount());
   }
 
-  // Key: CallVolume
-  private void addTotalCallVolume(MetricsRecordBuilder rb) {
-    rb.addCounter(Interns.info("CallVolume", "Total Call Volume"),
-        getTotalCallVolume());
+  // Key: DecayedCallVolume
+  private void addDecayedCallVolume(MetricsRecordBuilder rb) {
+    rb.addCounter(Interns.info("DecayedCallVolume", "Decayed Total " +
+        "incoming Call Volume"), getTotalCallVolume());
+  }
+
+  private void addRawCallVolume(MetricsRecordBuilder rb) {
+    rb.addCounter(Interns.info("CallVolume", "Raw Total " +
+        "incoming Call Volume"), getTotalRawCallVolume());
   }
 
-  // Key: Priority.0.CallVolume
+  // Key: Priority.0.CompletedCallVolume
   private void addCallVolumePerPriority(MetricsRecordBuilder rb) {
     for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) {
-      rb.addGauge(Interns.info("Priority." + i + ".CallVolume", "Call volume " +
+      rb.addGauge(Interns.info("Priority." + i + ".CompletedCallVolume",
+          "Completed Call volume " +
           "of priority "+ i), responseTimeCountInLastWindow.get(i));
     }
   }
@@ -816,16 +848,14 @@ public class DecayRpcScheduler implements RpcScheduler,
     }
   }
 
-  // Key: Top.0.Caller(xyz).Volume and Top.0.Caller(xyz).Priority
+  // Key: Caller(xyz).Volume and Caller(xyz).Priority
   private void addTopNCallerSummary(MetricsRecordBuilder rb) {
-    final int topCallerCount = 10;
-    TopN topNCallers = getTopCallers(topCallerCount);
+    TopN topNCallers = getTopCallers(topUsersCount);
     Map<Object, Integer> decisions = scheduleCacheRef.get();
     final int actualCallerCount = topNCallers.size();
     for (int i = 0; i < actualCallerCount; i++) {
       NameValuePair entry =  topNCallers.poll();
-      String topCaller = "Top." + (actualCallerCount - i) + "." +
-          "Caller(" + entry.getName() + ")";
+      String topCaller = "Caller(" + entry.getName() + ")";
       String topCallerVolume = topCaller + ".Volume";
       String topCallerPriority = topCaller + ".Priority";
       rb.addCounter(Interns.info(topCallerVolume, topCallerVolume),
@@ -838,15 +868,15 @@ public class DecayRpcScheduler implements RpcScheduler,
     }
   }
 
-  // Get the top N callers' call count and scheduler decision
+  // Get the top N callers' raw call count and scheduler decision
   private TopN getTopCallers(int n) {
     TopN topNCallers = new TopN(n);
-    Iterator<Map.Entry<Object, AtomicLong>> it =
+    Iterator<Map.Entry<Object, List<AtomicLong>>> it =
         callCounts.entrySet().iterator();
     while (it.hasNext()) {
-      Map.Entry<Object, AtomicLong> entry = it.next();
+      Map.Entry<Object, List<AtomicLong>> entry = it.next();
       String caller = entry.getKey().toString();
-      Long count = entry.getValue().get();
+      Long count = entry.getValue().get(1).get();
       if (count > 0) {
         topNCallers.offer(new NameValuePair(caller, count));
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ca88595/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 4a869fd..dbc9430 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1029,7 +1029,6 @@ public class TestRPC extends TestRpcBase {
     final TestRpcService proxy;
     boolean succeeded = false;
     final int numClients = 1;
-    final int queueSizePerHandler = 3;
 
     GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG);
@@ -1052,7 +1051,10 @@ public class TestRPC extends TestRpcBase {
 
     MetricsRecordBuilder rb1 =
         getMetrics("DecayRpcSchedulerMetrics2." + ns);
-    final long beginCallVolume = MetricsAsserts.getLongCounter("CallVolume", rb1);
+    final long beginDecayedCallVolume = MetricsAsserts.getLongCounter(
+        "DecayedCallVolume", rb1);
+    final long beginRawCallVolume = MetricsAsserts.getLongCounter(
+        "CallVolume", rb1);
     final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
         rb1);
 
@@ -1090,27 +1092,32 @@ public class TestRPC extends TestRpcBase {
           public Boolean get() {
             MetricsRecordBuilder rb2 =
               getMetrics("DecayRpcSchedulerMetrics2." + ns);
-            long callVolume1 = MetricsAsserts.getLongCounter("CallVolume", rb2);
-            int uniqueCaller1 = MetricsAsserts.getIntCounter("UniqueCallers",
-              rb2);
+            long decayedCallVolume1 = MetricsAsserts.getLongCounter(
+                "DecayedCallVolume", rb2);
+            long rawCallVolume1 = MetricsAsserts.getLongCounter(
+                "CallVolume", rb2);
+            int uniqueCaller1 = MetricsAsserts.getIntCounter(
+                "UniqueCallers", rb2);
             long callVolumePriority0 = MetricsAsserts.getLongGauge(
-                "Priority.0.CallVolume", rb2);
+                "Priority.0.CompletedCallVolume", rb2);
             long callVolumePriority1 = MetricsAsserts.getLongGauge(
-                "Priority.1.CallVolume", rb2);
+                "Priority.1.CompletedCallVolume", rb2);
             double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
                 "Priority.0.AvgResponseTime", rb2);
             double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
                 "Priority.1.AvgResponseTime", rb2);
 
-            LOG.info("CallVolume1: " + callVolume1);
+            LOG.info("DecayedCallVolume: " + decayedCallVolume1);
+            LOG.info("CallVolume: " + rawCallVolume1);
             LOG.info("UniqueCaller: " + uniqueCaller1);
-            LOG.info("Priority.0.CallVolume: " + callVolumePriority0);
-            LOG.info("Priority.1.CallVolume: " + callVolumePriority1);
+            LOG.info("Priority.0.CompletedCallVolume: " + callVolumePriority0);
+            LOG.info("Priority.1.CompletedCallVolume: " + callVolumePriority1);
             LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0);
             LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1);
 
-            return callVolume1 > beginCallVolume
-                && uniqueCaller1 > beginUniqueCaller;
+            return decayedCallVolume1 > beginDecayedCallVolume &&
+                rawCallVolume1 > beginRawCallVolume &&
+                uniqueCaller1 > beginUniqueCaller;
           }
         }, 30, 60000);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org