You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/02/19 18:30:40 UTC

hadoop git commit: HADOOP-9087. Queue size metric for metric sinks isn't actually maintained. Contributed by Akira AJISAKA

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2fd02afec -> f0f299268


HADOOP-9087. Queue size metric for metric sinks isn't actually maintained. Contributed by Akira AJISAKA


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0f29926
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0f29926
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0f29926

Branch: refs/heads/trunk
Commit: f0f299268625af275522f55d5bfc43118c31bdd8
Parents: 2fd02af
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Feb 19 17:30:07 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Feb 19 17:30:07 2015 +0000

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 ++
 .../metrics2/impl/MetricsSinkAdapter.java       | 15 +++++-
 .../hadoop-common/src/site/markdown/Metrics.md  |  2 +-
 .../metrics2/impl/TestMetricsSystemImpl.java    | 50 ++++++++++++++++++++
 4 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index c01e3d6..8d3f9f5 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -973,6 +973,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11595. Add default implementation for AbstractFileSystem#truncate.
     (yliu)
 
+    HADOOP-9087. Queue size metric for metric sinks isn't actually maintained
+    (Akira AJISAKA via jlowe)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
index 9add494..ed52317 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
@@ -95,7 +95,10 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
   boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
     if (logicalTime % period == 0) {
       LOG.debug("enqueue, logicalTime="+ logicalTime);
-      if (queue.enqueue(buffer)) return true;
+      if (queue.enqueue(buffer)) {
+        refreshQueueSizeGauge();
+        return true;
+      }
       dropped.incr();
       return false;
     }
@@ -105,7 +108,9 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
   public boolean putMetricsImmediate(MetricsBuffer buffer) {
     WaitableMetricsBuffer waitableBuffer =
         new WaitableMetricsBuffer(buffer);
-    if (!queue.enqueue(waitableBuffer)) {
+    if (queue.enqueue(waitableBuffer)) {
+      refreshQueueSizeGauge();
+    } else {
       LOG.warn(name + " has a full queue and can't consume the given metrics.");
       dropped.incr();
       return false;
@@ -127,6 +132,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
     while (!stopping) {
       try {
         queue.consumeAll(this);
+        refreshQueueSizeGauge();
         retryDelay = firstRetryDelay;
         n = retryCount;
         inError = false;
@@ -151,12 +157,17 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
                       "suppressing further error messages", e);
           }
           queue.clear();
+          refreshQueueSizeGauge();
           inError = true; // Don't keep complaining ad infinitum
         }
       }
     }
   }
 
+  private void refreshQueueSizeGauge() {
+    qsize.set(queue.size());
+  }
+
   @Override
   public void consume(MetricsBuffer buffer) {
     long ts = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index dbcf0d8..6953c3b 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -434,7 +434,7 @@ MetricsSystem shows the statistics for metrics snapshots and publishes. Each met
 | `Sink_`*instance*`NumOps` | Total number of sink operations for the *instance* |
 | `Sink_`*instance*`AvgTime` | Average time in milliseconds of sink operations for the *instance* |
 | `Sink_`*instance*`Dropped` | Total number of dropped sink operations for the *instance* |
-| `Sink_`*instance*`Qsize` | Current queue length of sink operations  (BUT always set to 0 because nothing to increment this metrics, see [HADOOP-9941](https://issues.apache.org/jira/browse/HADOOP-9941)) |
+| `Sink_`*instance*`Qsize` | Current queue length of sink operations |
 
 default context
 ===============

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
index 4c2ebc8..0f7b15f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
@@ -29,7 +29,9 @@ import org.junit.runner.RunWith;
 
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -434,6 +436,54 @@ public class TestMetricsSystemImpl {
                new MetricGaugeInt(MsInfo.NumActiveSinks, 3)));
   }
 
+  @Test
+  public void testQSize() throws Exception {
+    new ConfigBuilder().add("*.period", 8)
+        .add("test.sink.test.class", TestSink.class.getName())
+        .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+    MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+    final CountDownLatch proceedSignal = new CountDownLatch(1);
+    final CountDownLatch reachedPutMetricSignal = new CountDownLatch(1);
+    ms.start();
+    try {
+      MetricsSink slowSink = mock(MetricsSink.class);
+      MetricsSink dataSink = mock(MetricsSink.class);
+      ms.registerSink("slowSink",
+          "The sink that will wait on putMetric", slowSink);
+      ms.registerSink("dataSink",
+          "The sink I'll use to get info about slowSink", dataSink);
+      doAnswer(new Answer() {
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+          reachedPutMetricSignal.countDown();
+          proceedSignal.await();
+          return null;
+        }
+      }).when(slowSink).putMetrics(any(MetricsRecord.class));
+
+      // trigger metric collection first time
+      ms.onTimerEvent();
+      assertTrue(reachedPutMetricSignal.await(1, TimeUnit.SECONDS));
+      // Now that the slow sink is still processing the first metric,
+      // its queue length should be 1 for the second collection.
+      ms.onTimerEvent();
+      verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture());
+      List<MetricsRecord> mr = r1.getAllValues();
+      Number qSize = Iterables.find(mr.get(1).metrics(),
+          new Predicate<AbstractMetric>() {
+            @Override
+            public boolean apply(@Nullable AbstractMetric input) {
+              assert input != null;
+              return input.name().equals("Sink_slowSinkQsize");
+            }
+      }).value();
+      assertEquals(1, qSize);
+    } finally {
+      proceedSignal.countDown();
+      ms.stop();
+    }
+  }
+
   @Metrics(context="test")
   private static class TestSource {
     @Metric("C1 desc") MutableCounterLong c1;