You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/01 19:55:19 UTC
[18/39] hadoop git commit: HADOOP-13197. Add non-decayed call metrics
for DecayRpcScheduler. Contributed by Xiaoyu Yao.
HADOOP-13197. Add non-decayed call metrics for DecayRpcScheduler. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ca88595
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ca88595
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ca88595
Branch: refs/heads/HDFS-1312
Commit: 4ca8859583839761663fc1fc1de1b3ce2e3fc5b5
Parents: 4fc09a8
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Mon May 23 16:59:53 2016 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri May 27 18:07:44 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/ipc/DecayRpcScheduler.java | 130 ++++++++++++-------
.../java/org/apache/hadoop/ipc/TestRPC.java | 31 +++--
2 files changed, 99 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ca88595/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index 3443d03..ec87c75 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ipc;
import java.lang.ref.WeakReference;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -125,12 +126,17 @@ public class DecayRpcScheduler implements RpcScheduler,
public static final Logger LOG =
LoggerFactory.getLogger(DecayRpcScheduler.class);
- // Track the number of calls for each schedulable identity
- private final ConcurrentHashMap<Object, AtomicLong> callCounts =
- new ConcurrentHashMap<Object, AtomicLong>();
+ // Track the decayed and raw (no decay) number of calls for each schedulable
+ // identity from all previous decay windows: idx 0 for decayed call count and
+ // idx 1 for the raw call count
+ private final ConcurrentHashMap<Object, List<AtomicLong>> callCounts =
+ new ConcurrentHashMap<Object, List<AtomicLong>>();
+
+ // Should be the sum of all AtomicLongs in decayed callCounts
+ private final AtomicLong totalDecayedCallCount = new AtomicLong();
+ // The sum of all AtomicLongs in raw callCounts
+ private final AtomicLong totalRawCallCount = new AtomicLong();
- // Should be the sum of all AtomicLongs in callCounts
- private final AtomicLong totalCalls = new AtomicLong();
// Track total call count and response time in current decay window
private final AtomicLongArray responseTimeCountInCurrWindow;
@@ -155,6 +161,7 @@ public class DecayRpcScheduler implements RpcScheduler,
private final long[] backOffResponseTimeThresholds;
private final String namespace;
private final int topUsersCount; // e.g., report top 10 users' metrics
+ private static final double PRECISION = 0.0001;
/**
* This TimerTask will call decayCurrentCounts until
@@ -380,19 +387,23 @@ public class DecayRpcScheduler implements RpcScheduler,
*/
private void decayCurrentCounts() {
try {
- long total = 0;
- Iterator<Map.Entry<Object, AtomicLong>> it =
+ long totalDecayedCount = 0;
+ long totalRawCount = 0;
+ Iterator<Map.Entry<Object, List<AtomicLong>>> it =
callCounts.entrySet().iterator();
while (it.hasNext()) {
- Map.Entry<Object, AtomicLong> entry = it.next();
- AtomicLong count = entry.getValue();
+ Map.Entry<Object, List<AtomicLong>> entry = it.next();
+ AtomicLong decayedCount = entry.getValue().get(0);
+ AtomicLong rawCount = entry.getValue().get(1);
+
// Compute the next value by reducing it by the decayFactor
- long currentValue = count.get();
+ totalRawCount += rawCount.get();
+ long currentValue = decayedCount.get();
long nextValue = (long) (currentValue * decayFactor);
- total += nextValue;
- count.set(nextValue);
+ totalDecayedCount += nextValue;
+ decayedCount.set(nextValue);
if (nextValue == 0) {
// We will clean up unused keys here. An interesting optimization
@@ -403,7 +414,8 @@ public class DecayRpcScheduler implements RpcScheduler,
}
// Update the total so that we remain in sync
- totalCalls.set(total);
+ totalDecayedCallCount.set(totalDecayedCount);
+ totalRawCallCount.set(totalRawCount);
// Now refresh the cache of scheduling decisions
recomputeScheduleCache();
@@ -423,9 +435,9 @@ public class DecayRpcScheduler implements RpcScheduler,
private void recomputeScheduleCache() {
Map<Object, Integer> nextCache = new HashMap<Object, Integer>();
- for (Map.Entry<Object, AtomicLong> entry : callCounts.entrySet()) {
+ for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
Object id = entry.getKey();
- AtomicLong value = entry.getValue();
+ AtomicLong value = entry.getValue().get(0);
long snapshot = value.get();
int computedLevel = computePriorityLevel(snapshot);
@@ -442,27 +454,34 @@ public class DecayRpcScheduler implements RpcScheduler,
* @param identity the identity of the user to increment
* @return the value before incrementation
*/
- private long getAndIncrement(Object identity) throws InterruptedException {
+ private long getAndIncrementCallCounts(Object identity)
+ throws InterruptedException {
// We will increment the count, or create it if no such count exists
- AtomicLong count = this.callCounts.get(identity);
+ List<AtomicLong> count = this.callCounts.get(identity);
if (count == null) {
- // Create the count since no such count exists.
- count = new AtomicLong(0);
+ // Create the counts since no such count exists.
+ // idx 0 for decayed call count
+ // idx 1 for the raw call count
+ count = new ArrayList<AtomicLong>(2);
+ count.add(new AtomicLong(0));
+ count.add(new AtomicLong(0));
// Put it in, or get the AtomicInteger that was put in by another thread
- AtomicLong otherCount = callCounts.putIfAbsent(identity, count);
+ List<AtomicLong> otherCount = callCounts.putIfAbsent(identity, count);
if (otherCount != null) {
count = otherCount;
}
}
// Update the total
- totalCalls.getAndIncrement();
+ totalDecayedCallCount.getAndIncrement();
+ totalRawCallCount.getAndIncrement();
// At this point value is guaranteed to be not null. It may however have
// been clobbered from callCounts. Nonetheless, we return what
// we have.
- return count.getAndIncrement();
+ count.get(1).getAndIncrement();
+ return count.get(0).getAndIncrement();
}
/**
@@ -471,7 +490,7 @@ public class DecayRpcScheduler implements RpcScheduler,
* @return scheduling decision from 0 to numLevels - 1
*/
private int computePriorityLevel(long occurrences) {
- long totalCallSnapshot = totalCalls.get();
+ long totalCallSnapshot = totalDecayedCallCount.get();
double proportion = 0;
if (totalCallSnapshot > 0) {
@@ -497,7 +516,7 @@ public class DecayRpcScheduler implements RpcScheduler,
*/
private int cachedOrComputedPriorityLevel(Object identity) {
try {
- long occurrences = this.getAndIncrement(identity);
+ long occurrences = this.getAndIncrementCallCounts(identity);
// Try the cache
Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
@@ -580,7 +599,7 @@ public class DecayRpcScheduler implements RpcScheduler,
}
}
- // Update the cached average response time at the end of decay window
+ // Update the cached average response time at the end of the decay window
void updateAverageResponseTime(boolean enableDecay) {
for (int i = 0; i < numLevels; i++) {
double averageResponseTime = 0;
@@ -590,11 +609,13 @@ public class DecayRpcScheduler implements RpcScheduler,
averageResponseTime = (double) totalResponseTime / responseTimeCount;
}
final double lastAvg = responseTimeAvgInLastWindow.get(i);
- if (enableDecay && lastAvg > 0.0) {
- final double decayed = decayFactor * lastAvg + averageResponseTime;
- responseTimeAvgInLastWindow.set(i, decayed);
- } else {
- responseTimeAvgInLastWindow.set(i, averageResponseTime);
+ if (lastAvg > PRECISION || averageResponseTime > PRECISION) {
+ if (enableDecay) {
+ final double decayed = decayFactor * lastAvg + averageResponseTime;
+ responseTimeAvgInLastWindow.set(i, decayed);
+ } else {
+ responseTimeAvgInLastWindow.set(i, averageResponseTime);
+ }
}
responseTimeCountInLastWindow.set(i, responseTimeCount);
if (LOG.isDebugEnabled()) {
@@ -624,8 +645,8 @@ public class DecayRpcScheduler implements RpcScheduler,
public Map<Object, Long> getCallCountSnapshot() {
HashMap<Object, Long> snapshot = new HashMap<Object, Long>();
- for (Map.Entry<Object, AtomicLong> entry : callCounts.entrySet()) {
- snapshot.put(entry.getKey(), entry.getValue().get());
+ for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
+ snapshot.put(entry.getKey(), entry.getValue().get(0).get());
}
return Collections.unmodifiableMap(snapshot);
@@ -633,7 +654,7 @@ public class DecayRpcScheduler implements RpcScheduler,
@VisibleForTesting
public long getTotalCallSnapshot() {
- return totalCalls.get();
+ return totalDecayedCallCount.get();
}
/**
@@ -750,7 +771,11 @@ public class DecayRpcScheduler implements RpcScheduler,
}
public long getTotalCallVolume() {
- return totalCalls.get();
+ return totalDecayedCallCount.get();
+ }
+
+ public long getTotalRawCallVolume() {
+ return totalRawCallCount.get();
}
public long[] getResponseTimeCountInLastWindow() {
@@ -776,11 +801,12 @@ public class DecayRpcScheduler implements RpcScheduler,
try {
MetricsRecordBuilder rb = collector.addRecord(getClass().getName())
.setContext(namespace);
- addTotalCallVolume(rb);
+ addDecayedCallVolume(rb);
addUniqueIdentityCount(rb);
addTopNCallerSummary(rb);
addAvgResponseTimePerPriority(rb);
addCallVolumePerPriority(rb);
+ addRawCallVolume(rb);
} catch (Exception e) {
LOG.warn("Exception thrown while metric collection. Exception : "
+ e.getMessage());
@@ -793,16 +819,22 @@ public class DecayRpcScheduler implements RpcScheduler,
getUniqueIdentityCount());
}
- // Key: CallVolume
- private void addTotalCallVolume(MetricsRecordBuilder rb) {
- rb.addCounter(Interns.info("CallVolume", "Total Call Volume"),
- getTotalCallVolume());
+ // Key: DecayedCallVolume
+ private void addDecayedCallVolume(MetricsRecordBuilder rb) {
+ rb.addCounter(Interns.info("DecayedCallVolume", "Decayed Total " +
+ "incoming Call Volume"), getTotalCallVolume());
+ }
+
+ private void addRawCallVolume(MetricsRecordBuilder rb) {
+ rb.addCounter(Interns.info("CallVolume", "Raw Total " +
+ "incoming Call Volume"), getTotalRawCallVolume());
}
- // Key: Priority.0.CallVolume
+ // Key: Priority.0.CompletedCallVolume
private void addCallVolumePerPriority(MetricsRecordBuilder rb) {
for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) {
- rb.addGauge(Interns.info("Priority." + i + ".CallVolume", "Call volume " +
+ rb.addGauge(Interns.info("Priority." + i + ".CompletedCallVolume",
+ "Completed Call volume " +
"of priority "+ i), responseTimeCountInLastWindow.get(i));
}
}
@@ -816,16 +848,14 @@ public class DecayRpcScheduler implements RpcScheduler,
}
}
- // Key: Top.0.Caller(xyz).Volume and Top.0.Caller(xyz).Priority
+ // Key: Caller(xyz).Volume and Caller(xyz).Priority
private void addTopNCallerSummary(MetricsRecordBuilder rb) {
- final int topCallerCount = 10;
- TopN topNCallers = getTopCallers(topCallerCount);
+ TopN topNCallers = getTopCallers(topUsersCount);
Map<Object, Integer> decisions = scheduleCacheRef.get();
final int actualCallerCount = topNCallers.size();
for (int i = 0; i < actualCallerCount; i++) {
NameValuePair entry = topNCallers.poll();
- String topCaller = "Top." + (actualCallerCount - i) + "." +
- "Caller(" + entry.getName() + ")";
+ String topCaller = "Caller(" + entry.getName() + ")";
String topCallerVolume = topCaller + ".Volume";
String topCallerPriority = topCaller + ".Priority";
rb.addCounter(Interns.info(topCallerVolume, topCallerVolume),
@@ -838,15 +868,15 @@ public class DecayRpcScheduler implements RpcScheduler,
}
}
- // Get the top N callers' call count and scheduler decision
+ // Get the top N callers' raw call count and scheduler decision
private TopN getTopCallers(int n) {
TopN topNCallers = new TopN(n);
- Iterator<Map.Entry<Object, AtomicLong>> it =
+ Iterator<Map.Entry<Object, List<AtomicLong>>> it =
callCounts.entrySet().iterator();
while (it.hasNext()) {
- Map.Entry<Object, AtomicLong> entry = it.next();
+ Map.Entry<Object, List<AtomicLong>> entry = it.next();
String caller = entry.getKey().toString();
- Long count = entry.getValue().get();
+ Long count = entry.getValue().get(1).get();
if (count > 0) {
topNCallers.offer(new NameValuePair(caller, count));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ca88595/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 4a869fd..dbc9430 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1029,7 +1029,6 @@ public class TestRPC extends TestRpcBase {
final TestRpcService proxy;
boolean succeeded = false;
final int numClients = 1;
- final int queueSizePerHandler = 3;
GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG);
@@ -1052,7 +1051,10 @@ public class TestRPC extends TestRpcBase {
MetricsRecordBuilder rb1 =
getMetrics("DecayRpcSchedulerMetrics2." + ns);
- final long beginCallVolume = MetricsAsserts.getLongCounter("CallVolume", rb1);
+ final long beginDecayedCallVolume = MetricsAsserts.getLongCounter(
+ "DecayedCallVolume", rb1);
+ final long beginRawCallVolume = MetricsAsserts.getLongCounter(
+ "CallVolume", rb1);
final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
rb1);
@@ -1090,27 +1092,32 @@ public class TestRPC extends TestRpcBase {
public Boolean get() {
MetricsRecordBuilder rb2 =
getMetrics("DecayRpcSchedulerMetrics2." + ns);
- long callVolume1 = MetricsAsserts.getLongCounter("CallVolume", rb2);
- int uniqueCaller1 = MetricsAsserts.getIntCounter("UniqueCallers",
- rb2);
+ long decayedCallVolume1 = MetricsAsserts.getLongCounter(
+ "DecayedCallVolume", rb2);
+ long rawCallVolume1 = MetricsAsserts.getLongCounter(
+ "CallVolume", rb2);
+ int uniqueCaller1 = MetricsAsserts.getIntCounter(
+ "UniqueCallers", rb2);
long callVolumePriority0 = MetricsAsserts.getLongGauge(
- "Priority.0.CallVolume", rb2);
+ "Priority.0.CompletedCallVolume", rb2);
long callVolumePriority1 = MetricsAsserts.getLongGauge(
- "Priority.1.CallVolume", rb2);
+ "Priority.1.CompletedCallVolume", rb2);
double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
"Priority.0.AvgResponseTime", rb2);
double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
"Priority.1.AvgResponseTime", rb2);
- LOG.info("CallVolume1: " + callVolume1);
+ LOG.info("DecayedCallVolume: " + decayedCallVolume1);
+ LOG.info("CallVolume: " + rawCallVolume1);
LOG.info("UniqueCaller: " + uniqueCaller1);
- LOG.info("Priority.0.CallVolume: " + callVolumePriority0);
- LOG.info("Priority.1.CallVolume: " + callVolumePriority1);
+ LOG.info("Priority.0.CompletedCallVolume: " + callVolumePriority0);
+ LOG.info("Priority.1.CompletedCallVolume: " + callVolumePriority1);
LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0);
LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1);
- return callVolume1 > beginCallVolume
- && uniqueCaller1 > beginUniqueCaller;
+ return decayedCallVolume1 > beginDecayedCallVolume &&
+ rawCallVolume1 > beginRawCallVolume &&
+ uniqueCaller1 > beginUniqueCaller;
}
}, 30, 60000);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org