You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2012/12/06 22:51:37 UTC

svn commit: r1418101 - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/metrics2/ src/core/org/apache/hadoop/metrics2/impl/ src/core/org/apache/hadoop/metrics2/lib/ src/test/org/apache/hadoop/metrics2/impl/

Author: suresh
Date: Thu Dec  6 21:51:36 2012
New Revision: 1418101

URL: http://svn.apache.org/viewvc?rev=1418101&view=rev
Log:
HADOOP-9090. Support on-demand publish of metrics. Contributed by Mostafa Elhemali.

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/MetricsSystem.java
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Thu Dec  6 21:51:36 2012
@@ -42,6 +42,9 @@ Release 1.2.0 - unreleased
     HDFS-4219. Backport slive to branch-1. (Backported by Arpit Gupta via
     suresh)
 
+    HADOOP-9090. Support on-demand publish of metrics. (Mostafa Elhemali via
+    suresh)
+
   IMPROVEMENTS
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/MetricsSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/MetricsSystem.java?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/MetricsSystem.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/MetricsSystem.java Thu Dec  6 21:51:36 2012
@@ -52,6 +52,17 @@ public interface MetricsSystem extends M
   void register(Callback callback);
 
   /**
+   * Requests an immediate publish of all metrics from sources to sinks.
+   * 
+   * This is a "soft" request: the expectation is that a best effort will be
+   * done to synchronously snapshot the metrics from all the sources and put
+   * them in all the sinks (including flushing the sinks) before returning to
+   * the caller. If this can't be accomplished in reasonable time it's OK to
+   * return to the caller before everything is done. 
+   */
+  public abstract void publishMetricsNow();
+
+  /**
    * Shutdown the metrics system completely (usually during server shutdown.)
    * The MetricsSystemMXBean will be unregistered.
    */

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java Thu Dec  6 21:51:36 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.metrics2.impl;
 
 import java.util.Random;
+import java.util.concurrent.*;
 import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
@@ -28,6 +29,7 @@ import org.apache.hadoop.metrics2.util.C
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.metrics2.Metric;
 import org.apache.hadoop.metrics2.MetricsFilter;
 import org.apache.hadoop.metrics2.MetricsSink;
 
@@ -45,6 +47,7 @@ class MetricsSinkAdapter {
   private volatile boolean stopping = false;
   private volatile boolean inError = false;
   private final int period, firstRetryDelay, retryCount;
+  private final long oobPutTimeout;
   private final float retryBackoff;
   private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
   private final MetricMutableStat latency;
@@ -75,6 +78,8 @@ class MetricsSinkAdapter {
         Contracts.checkArg(retryDelay, retryDelay > 0, "retry delay");
     this.retryBackoff =
         Contracts.checkArg(retryBackoff, retryBackoff > 1, "backoff factor");
+    oobPutTimeout = (long)
+        (firstRetryDelay * Math.pow(retryBackoff, retryCount) * 1000);
     this.retryCount = retryCount;
     this.queue = new SinkQueue<MetricsBuffer>(
         Contracts.checkArg(queueCapacity, queueCapacity > 0, "queue capacity"));
@@ -93,6 +98,23 @@ class MetricsSinkAdapter {
     sinkThread.setDaemon(true);
   }
 
+  public boolean putMetricsImmediate(MetricsBuffer buffer) {
+    WaitableMetricsBuffer waitableBuffer =
+        new WaitableMetricsBuffer(buffer);
+    if (!queue.enqueue(waitableBuffer)) {
+      LOG.warn(name + " has a full queue and can't consume the given metrics.");
+      dropped.incr();
+      return false;
+    }
+    if (!waitableBuffer.waitTillNotified(oobPutTimeout)) {
+      LOG.warn(name +
+          " couldn't fulfill an immediate putMetrics request in time." +
+          " Abandoning.");
+      return false;
+    }
+    return true;
+  }
+
   boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
     if (logicalTime % period == 0) {
       LOG.debug("enqueue, logicalTime="+ logicalTime);
@@ -167,6 +189,9 @@ class MetricsSinkAdapter {
       sink.flush();
       latency.add(System.currentTimeMillis() - ts);
     }
+    if (buffer instanceof WaitableMetricsBuffer) {
+      ((WaitableMetricsBuffer)buffer).notifyAnyWaiters();
+    }
     LOG.debug("Done");
   }
 
@@ -202,4 +227,26 @@ class MetricsSinkAdapter {
     return sink;
   }
 
+  static class WaitableMetricsBuffer extends MetricsBuffer {
+    private final Semaphore notificationSemaphore =
+        new Semaphore(0);
+
+    public WaitableMetricsBuffer(MetricsBuffer metricsBuffer) {
+      super(metricsBuffer);
+    }
+
+    public boolean waitTillNotified(long millisecondsToWait) {
+      try {
+        return notificationSemaphore.tryAcquire(millisecondsToWait,
+            TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        return false;
+      }
+    }
+
+    public void notifyAnyWaiters() {
+      notificationSemaphore.release();
+    }
+  }
+
 }

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Thu Dec  6 21:51:36 2012
@@ -306,9 +306,19 @@ public class MetricsSystemImpl implement
   synchronized void onTimerEvent() {
     logicalTime += period;
     if (sinks.size() > 0) {
-      publishMetrics(snapshotMetrics());
+      publishMetrics(snapshotMetrics(), false);
     }
   }
+  
+  /**
+   * Requests an immediate publish of all metrics from sources to sinks.
+   */
+  @Override
+  public void publishMetricsNow() {
+    if (sinks.size() > 0) {
+      publishMetrics(snapshotMetrics(), true);
+    }    
+  }
 
   /**
    * snapshot all the sources for a snapshot of metrics/tags
@@ -342,12 +352,20 @@ public class MetricsSystemImpl implement
   /**
    * Publish a metrics snapshot to all the sinks
    * @param buffer  the metrics snapshot to publish
+   * @param immediate  indicates that we should publish metrics immediately
+   *                   instead of using a separate thread.
    */
-  synchronized void publishMetrics(MetricsBuffer buffer) {
+  synchronized void publishMetrics(MetricsBuffer buffer, boolean immediate) {
     int dropped = 0;
     for (MetricsSinkAdapter sa : sinks.values()) {
       long startTime = System.currentTimeMillis();
-      dropped += sa.putMetrics(buffer, logicalTime) ? 0 : 1;
+      boolean result;
+      if (immediate) {
+        result = sa.putMetricsImmediate(buffer); 
+      } else {
+        result = sa.putMetrics(buffer, logicalTime);
+      }
+      dropped += result ? 0 : 1;
       publishStat.add(System.currentTimeMillis() - startTime);
     }
     dropStat.incr(dropped);

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java Thu Dec  6 21:51:36 2012
@@ -76,6 +76,11 @@ public enum DefaultMetricsSystem impleme
     impl.register(callback);
   }
 
+  @Override
+  public void publishMetricsNow() {
+    impl.publishMetricsNow();
+  }
+
   public void start() {
     impl.start();
   }
@@ -95,5 +100,4 @@ public enum DefaultMetricsSystem impleme
   public void shutdown() {
     impl.shutdown();
   }
-
 }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java Thu Dec  6 21:51:36 2012
@@ -23,7 +23,10 @@ import org.apache.hadoop.metrics2.lib.Me
 import org.apache.hadoop.metrics2.lib.MetricMutableStat;
 import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.AbstractMetricsSource;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+ 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.Test;
@@ -52,7 +55,7 @@ public class TestMetricsSystemImpl {
   private static String hostname = MetricsSystemImpl.getHostname();
 
   @Test public void testInitFirst() throws Exception {
-    ConfigBuilder cb = new ConfigBuilder().add("default.period", 8)
+    new ConfigBuilder().add("default.period", 8)
         .add("source.filter.class",
              "org.apache.hadoop.metrics2.filter.GlobFilter")
         .add("test.*.source.filter.class", "${source.filter.class}")
@@ -74,8 +77,9 @@ public class TestMetricsSystemImpl {
     ms.register("sink1", "sink1 desc", sink1);
     ms.register("sink2", "sink2 desc", sink2);
     ms.register("sink3", "sink3 desc", sink3);
-    ms.onTimerEvent();  // trigger something interesting
+    ms.publishMetricsNow(); // publish the metrics
     ms.stop();
+    ms.shutdown();
 
     verify(sink1, times(3)).putMetrics(r1.capture()); // 2 + 1 sys source
     List<MetricsRecord> mr1 = r1.getAllValues();
@@ -88,6 +92,178 @@ public class TestMetricsSystemImpl {
     checkMetricsRecords(mr3, "s3rec");
   }
 
+  @Test public void testMultiThreadedPublish() throws Exception {
+    new ConfigBuilder().add("*.period", 80)
+      .add("test.sink.Collector.queue.capacity", "20")
+      .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+    final MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+    ms.start();
+    final int numThreads = 10;
+    final CollectingSink sink = new CollectingSink(numThreads);
+    ms.registerSink("Collector",
+        "Collector of values from all threads.", sink);
+    final TestSource[] sources = new TestSource[numThreads];
+    final Thread[] threads = new Thread[numThreads];
+    final String[] results = new String[numThreads];
+    final CyclicBarrier barrier1 = new CyclicBarrier(numThreads),
+        barrier2 = new CyclicBarrier(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      sources[i] = ms.register("threadSource" + i,
+          "A source of my threaded goodness.",
+          new TestSource("threadSourceRec" + i));
+      threads[i] = new Thread(new Runnable() {
+        private boolean safeAwait(int mySource, CyclicBarrier barrier) {
+          try {
+            barrier1.await(2, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            results[mySource] = "Interrupted";
+            return false;
+          } catch (BrokenBarrierException e) {
+            results[mySource] = "Broken Barrier";
+            return false;
+          } catch (TimeoutException e) {
+            results[mySource] = "Timed out on barrier";
+            return false;
+          }
+          return true;
+        }
+        
+        @Override
+        public void run() {
+          int mySource = Integer.parseInt(Thread.currentThread().getName());
+          if (sink.collected[mySource].get() != 0L) {
+            results[mySource] = "Someone else collected my metric!";
+            return;
+          }
+          // There is a race between setting the source value and
+          // which thread takes a snapshot first. Set the value here
+          // before any thread starts publishing so they all start
+          // with the right value.
+          sources[mySource].g1.set(230);
+          // Wait for all the threads to come here so we can hammer
+          // the system at the same time
+          if (!safeAwait(mySource, barrier1)) return;
+          ms.publishMetricsNow();
+          // Since some other thread may have snatched my metric,
+          // I need to wait for the threads to finish before checking.
+          if (!safeAwait(mySource, barrier2)) return;
+          if (sink.collected[mySource].get() != 230L) {
+            results[mySource] = "Metric not collected!";
+            return;
+          }
+          results[mySource] = "Passed";
+        }
+      }, "" + i);
+    }
+    for (Thread t : threads)
+      t.start();
+    for (Thread t : threads)
+      t.join();
+    boolean pass = true;
+    String allResults = "";
+    for (String r : results) {
+      allResults += r + "\n";
+      pass = pass && r.equalsIgnoreCase("Passed");
+    }
+    assertTrue(allResults, pass);
+    ms.stop();
+    ms.shutdown();
+  }
+
+  private static class CollectingSink implements MetricsSink {
+    private final AtomicLong[] collected;
+    
+    public CollectingSink(int capacity) {
+      collected = new AtomicLong[capacity];
+      for (int i = 0; i < capacity; i++) {
+        collected[i] = new AtomicLong();
+      }
+    }
+    
+    @Override
+    public void init(SubsetConfiguration conf) {
+    }
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+      final String prefix = "threadSourceRec";
+      if (record.name().startsWith(prefix)) {
+        final int recordNumber = Integer.parseInt(
+            record.name().substring(prefix.length()));
+        ArrayList<String> names = new ArrayList<String>();
+        for (Metric m : record.metrics()) {
+          if (m.name().equalsIgnoreCase("g1")) {
+            collected[recordNumber].set(m.value().longValue());
+            return;
+          }
+          names.add(m.name());
+        }
+      }
+    }
+
+    @Override
+    public void flush() {
+    }
+  }
+
+  @Test public void testHangingSink() {
+    new ConfigBuilder().add("*.period", 8)
+      .add("test.sink.hanging.retry.delay", "1")
+      .add("test.sink.hanging.retry.backoff", "1.01")
+      .add("test.sink.hanging.retry.count", "0")
+      .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+    MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+    ms.start();
+    TestSource s = ms.register("s3", "s3 desc", new TestSource("s3rec"));
+    s.c1.incr();
+    HangingSink hanging = new HangingSink();
+    ms.registerSink("hanging", "Hang the sink!", hanging);
+    ms.publishMetricsNow();
+    assertFalse(hanging.getInterrupted());
+    ms.stop();
+    ms.shutdown();
+    assertTrue(hanging.getInterrupted());
+    assertTrue("The sink didn't get called after its first hang " +
+               "for subsequent records.", hanging.getGotCalledSecondTime());
+  }
+
+  private static class HangingSink implements MetricsSink {
+    private volatile boolean interrupted;
+    private boolean gotCalledSecondTime;
+    private boolean firstTime = true;
+
+    public boolean getGotCalledSecondTime() {
+      return gotCalledSecondTime;
+    }
+
+    public boolean getInterrupted() {
+      return interrupted;
+    }
+
+    @Override
+    public void init(SubsetConfiguration conf) {
+    }
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+      // No need to hang every time, just the first record.
+      if (!firstTime) {
+        gotCalledSecondTime = true;
+        return;
+      }
+      firstTime = false;
+      try {
+        Thread.sleep(10 * 1000);
+      } catch (InterruptedException ex) {
+        interrupted = true;
+      }
+    }
+
+    @Override
+    public void flush() {
+    }
+  }
+
   static void checkMetricsRecords(List<MetricsRecord> recs, String expected) {
     LOG.debug(recs);
     MetricsRecord r = recs.get(0);