You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/02/25 18:23:49 UTC
[06/50] [abbrv] hadoop git commit: HADOOP-9087. Queue size metric for
metric sinks isn't actually maintained. Contributed by Akira AJISAKA
HADOOP-9087. Queue size metric for metric sinks isn't actually maintained. Contributed by Akira AJISAKA
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0f29926
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0f29926
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0f29926
Branch: refs/heads/YARN-2928
Commit: f0f299268625af275522f55d5bfc43118c31bdd8
Parents: 2fd02af
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Feb 19 17:30:07 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Feb 19 17:30:07 2015 +0000
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 ++
.../metrics2/impl/MetricsSinkAdapter.java | 15 +++++-
.../hadoop-common/src/site/markdown/Metrics.md | 2 +-
.../metrics2/impl/TestMetricsSystemImpl.java | 50 ++++++++++++++++++++
4 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index c01e3d6..8d3f9f5 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -973,6 +973,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11595. Add default implementation for AbstractFileSystem#truncate.
(yliu)
+ HADOOP-9087. Queue size metric for metric sinks isn't actually maintained
+ (Akira AJISAKA via jlowe)
+
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
index 9add494..ed52317 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
@@ -95,7 +95,10 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
if (logicalTime % period == 0) {
LOG.debug("enqueue, logicalTime="+ logicalTime);
- if (queue.enqueue(buffer)) return true;
+ if (queue.enqueue(buffer)) {
+ refreshQueueSizeGauge();
+ return true;
+ }
dropped.incr();
return false;
}
@@ -105,7 +108,9 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
public boolean putMetricsImmediate(MetricsBuffer buffer) {
WaitableMetricsBuffer waitableBuffer =
new WaitableMetricsBuffer(buffer);
- if (!queue.enqueue(waitableBuffer)) {
+ if (queue.enqueue(waitableBuffer)) {
+ refreshQueueSizeGauge();
+ } else {
LOG.warn(name + " has a full queue and can't consume the given metrics.");
dropped.incr();
return false;
@@ -127,6 +132,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
while (!stopping) {
try {
queue.consumeAll(this);
+ refreshQueueSizeGauge();
retryDelay = firstRetryDelay;
n = retryCount;
inError = false;
@@ -151,12 +157,17 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
"suppressing further error messages", e);
}
queue.clear();
+ refreshQueueSizeGauge();
inError = true; // Don't keep complaining ad infinitum
}
}
}
}
+ private void refreshQueueSizeGauge() {
+ qsize.set(queue.size());
+ }
+
@Override
public void consume(MetricsBuffer buffer) {
long ts = 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index dbcf0d8..6953c3b 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -434,7 +434,7 @@ MetricsSystem shows the statistics for metrics snapshots and publishes. Each met
| `Sink_`*instance*`NumOps` | Total number of sink operations for the *instance* |
| `Sink_`*instance*`AvgTime` | Average time in milliseconds of sink operations for the *instance* |
| `Sink_`*instance*`Dropped` | Total number of dropped sink operations for the *instance* |
-| `Sink_`*instance*`Qsize` | Current queue length of sink operations (BUT always set to 0 because nothing to increment this metrics, see [HADOOP-9941](https://issues.apache.org/jira/browse/HADOOP-9941)) |
+| `Sink_`*instance*`Qsize` | Current queue length of sink operations |
default context
===============
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
index 4c2ebc8..0f7b15f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
@@ -29,7 +29,9 @@ import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -434,6 +436,54 @@ public class TestMetricsSystemImpl {
new MetricGaugeInt(MsInfo.NumActiveSinks, 3)));
}
+ @Test
+ public void testQSize() throws Exception {
+ new ConfigBuilder().add("*.period", 8)
+ .add("test.sink.test.class", TestSink.class.getName())
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+ MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+ final CountDownLatch proceedSignal = new CountDownLatch(1);
+ final CountDownLatch reachedPutMetricSignal = new CountDownLatch(1);
+ ms.start();
+ try {
+ MetricsSink slowSink = mock(MetricsSink.class);
+ MetricsSink dataSink = mock(MetricsSink.class);
+ ms.registerSink("slowSink",
+ "The sink that will wait on putMetric", slowSink);
+ ms.registerSink("dataSink",
+ "The sink I'll use to get info about slowSink", dataSink);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ reachedPutMetricSignal.countDown();
+ proceedSignal.await();
+ return null;
+ }
+ }).when(slowSink).putMetrics(any(MetricsRecord.class));
+
+ // trigger metric collection first time
+ ms.onTimerEvent();
+ assertTrue(reachedPutMetricSignal.await(1, TimeUnit.SECONDS));
+ // Now that the slow sink is still processing the first metric,
+ // its queue length should be 1 for the second collection.
+ ms.onTimerEvent();
+ verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture());
+ List<MetricsRecord> mr = r1.getAllValues();
+ Number qSize = Iterables.find(mr.get(1).metrics(),
+ new Predicate<AbstractMetric>() {
+ @Override
+ public boolean apply(@Nullable AbstractMetric input) {
+ assert input != null;
+ return input.name().equals("Sink_slowSinkQsize");
+ }
+ }).value();
+ assertEquals(1, qSize);
+ } finally {
+ proceedSignal.countDown();
+ ms.stop();
+ }
+ }
+
@Metrics(context="test")
private static class TestSource {
@Metric("C1 desc") MutableCounterLong c1;