You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2012/12/06 22:51:37 UTC
svn commit: r1418101 - in /hadoop/common/branches/branch-1: ./
src/core/org/apache/hadoop/metrics2/
src/core/org/apache/hadoop/metrics2/impl/
src/core/org/apache/hadoop/metrics2/lib/
src/test/org/apache/hadoop/metrics2/impl/
Author: suresh
Date: Thu Dec 6 21:51:36 2012
New Revision: 1418101
URL: http://svn.apache.org/viewvc?rev=1418101&view=rev
Log:
HADOOP-9090. Support on-demand publish of metrics. Contributed by Mostafa Elhemali.
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/MetricsSystem.java
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Thu Dec 6 21:51:36 2012
@@ -42,6 +42,9 @@ Release 1.2.0 - unreleased
HDFS-4219. Backport slive to branch-1. (Backported by Arpit Gupta via
suresh)
+ HADOOP-9090. Support on-demand publish of metrics. (Mostafa Elhemali via
+ suresh)
+
IMPROVEMENTS
HDFS-3515. Port HDFS-1457 to branch-1. (eli)
Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/MetricsSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/MetricsSystem.java?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/MetricsSystem.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/MetricsSystem.java Thu Dec 6 21:51:36 2012
@@ -52,6 +52,17 @@ public interface MetricsSystem extends M
void register(Callback callback);
/**
+ * Requests an immediate publish of all metrics from sources to sinks.
+ *
+ * This is a "soft" request: the expectation is that a best effort will be
+ * done to synchronously snapshot the metrics from all the sources and put
+ * them in all the sinks (including flushing the sinks) before returning to
+ * the caller. If this can't be accomplished in reasonable time it's OK to
+ * return to the caller before everything is done.
+ */
+ public abstract void publishMetricsNow();
+
+ /**
* Shutdown the metrics system completely (usually during server shutdown.)
* The MetricsSystemMXBean will be unregistered.
*/
Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java Thu Dec 6 21:51:36 2012
@@ -19,6 +19,7 @@
package org.apache.hadoop.metrics2.impl;
import java.util.Random;
+import java.util.concurrent.*;
import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
@@ -28,6 +29,7 @@ import org.apache.hadoop.metrics2.util.C
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.Metric;
import org.apache.hadoop.metrics2.MetricsFilter;
import org.apache.hadoop.metrics2.MetricsSink;
@@ -45,6 +47,7 @@ class MetricsSinkAdapter {
private volatile boolean stopping = false;
private volatile boolean inError = false;
private final int period, firstRetryDelay, retryCount;
+ private final long oobPutTimeout;
private final float retryBackoff;
private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
private final MetricMutableStat latency;
@@ -75,6 +78,8 @@ class MetricsSinkAdapter {
Contracts.checkArg(retryDelay, retryDelay > 0, "retry delay");
this.retryBackoff =
Contracts.checkArg(retryBackoff, retryBackoff > 1, "backoff factor");
+ oobPutTimeout = (long)
+ (firstRetryDelay * Math.pow(retryBackoff, retryCount) * 1000);
this.retryCount = retryCount;
this.queue = new SinkQueue<MetricsBuffer>(
Contracts.checkArg(queueCapacity, queueCapacity > 0, "queue capacity"));
@@ -93,6 +98,23 @@ class MetricsSinkAdapter {
sinkThread.setDaemon(true);
}
+ public boolean putMetricsImmediate(MetricsBuffer buffer) {
+ WaitableMetricsBuffer waitableBuffer =
+ new WaitableMetricsBuffer(buffer);
+ if (!queue.enqueue(waitableBuffer)) {
+ LOG.warn(name + " has a full queue and can't consume the given metrics.");
+ dropped.incr();
+ return false;
+ }
+ if (!waitableBuffer.waitTillNotified(oobPutTimeout)) {
+ LOG.warn(name +
+ " couldn't fulfill an immediate putMetrics request in time." +
+ " Abandoning.");
+ return false;
+ }
+ return true;
+ }
+
boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
if (logicalTime % period == 0) {
LOG.debug("enqueue, logicalTime="+ logicalTime);
@@ -167,6 +189,9 @@ class MetricsSinkAdapter {
sink.flush();
latency.add(System.currentTimeMillis() - ts);
}
+ if (buffer instanceof WaitableMetricsBuffer) {
+ ((WaitableMetricsBuffer)buffer).notifyAnyWaiters();
+ }
LOG.debug("Done");
}
@@ -202,4 +227,26 @@ class MetricsSinkAdapter {
return sink;
}
+ static class WaitableMetricsBuffer extends MetricsBuffer {
+ private final Semaphore notificationSemaphore =
+ new Semaphore(0);
+
+ public WaitableMetricsBuffer(MetricsBuffer metricsBuffer) {
+ super(metricsBuffer);
+ }
+
+ public boolean waitTillNotified(long millisecondsToWait) {
+ try {
+ return notificationSemaphore.tryAcquire(millisecondsToWait,
+ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ public void notifyAnyWaiters() {
+ notificationSemaphore.release();
+ }
+ }
+
}
Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Thu Dec 6 21:51:36 2012
@@ -306,9 +306,19 @@ public class MetricsSystemImpl implement
synchronized void onTimerEvent() {
logicalTime += period;
if (sinks.size() > 0) {
- publishMetrics(snapshotMetrics());
+ publishMetrics(snapshotMetrics(), false);
}
}
+
+ /**
+ * Requests an immediate publish of all metrics from sources to sinks.
+ */
+ @Override
+ public void publishMetricsNow() {
+ if (sinks.size() > 0) {
+ publishMetrics(snapshotMetrics(), true);
+ }
+ }
/**
* snapshot all the sources for a snapshot of metrics/tags
@@ -342,12 +352,20 @@ public class MetricsSystemImpl implement
/**
* Publish a metrics snapshot to all the sinks
* @param buffer the metrics snapshot to publish
+ * @param immediate indicates that we should publish metrics immediately
+ * instead of using a separate thread.
*/
- synchronized void publishMetrics(MetricsBuffer buffer) {
+ synchronized void publishMetrics(MetricsBuffer buffer, boolean immediate) {
int dropped = 0;
for (MetricsSinkAdapter sa : sinks.values()) {
long startTime = System.currentTimeMillis();
- dropped += sa.putMetrics(buffer, logicalTime) ? 0 : 1;
+ boolean result;
+ if (immediate) {
+ result = sa.putMetricsImmediate(buffer);
+ } else {
+ result = sa.putMetrics(buffer, logicalTime);
+ }
+ dropped += result ? 0 : 1;
publishStat.add(System.currentTimeMillis() - startTime);
}
dropStat.incr(dropped);
Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java Thu Dec 6 21:51:36 2012
@@ -76,6 +76,11 @@ public enum DefaultMetricsSystem impleme
impl.register(callback);
}
+ @Override
+ public void publishMetricsNow() {
+ impl.publishMetricsNow();
+ }
+
public void start() {
impl.start();
}
@@ -95,5 +100,4 @@ public enum DefaultMetricsSystem impleme
public void shutdown() {
impl.shutdown();
}
-
}
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java?rev=1418101&r1=1418100&r2=1418101&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java Thu Dec 6 21:51:36 2012
@@ -23,7 +23,10 @@ import org.apache.hadoop.metrics2.lib.Me
import org.apache.hadoop.metrics2.lib.MetricMutableStat;
import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
import org.apache.hadoop.metrics2.lib.AbstractMetricsSource;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
@@ -52,7 +55,7 @@ public class TestMetricsSystemImpl {
private static String hostname = MetricsSystemImpl.getHostname();
@Test public void testInitFirst() throws Exception {
- ConfigBuilder cb = new ConfigBuilder().add("default.period", 8)
+ new ConfigBuilder().add("default.period", 8)
.add("source.filter.class",
"org.apache.hadoop.metrics2.filter.GlobFilter")
.add("test.*.source.filter.class", "${source.filter.class}")
@@ -74,8 +77,9 @@ public class TestMetricsSystemImpl {
ms.register("sink1", "sink1 desc", sink1);
ms.register("sink2", "sink2 desc", sink2);
ms.register("sink3", "sink3 desc", sink3);
- ms.onTimerEvent(); // trigger something interesting
+ ms.publishMetricsNow(); // publish the metrics
ms.stop();
+ ms.shutdown();
verify(sink1, times(3)).putMetrics(r1.capture()); // 2 + 1 sys source
List<MetricsRecord> mr1 = r1.getAllValues();
@@ -88,6 +92,178 @@ public class TestMetricsSystemImpl {
checkMetricsRecords(mr3, "s3rec");
}
+ @Test public void testMultiThreadedPublish() throws Exception {
+ new ConfigBuilder().add("*.period", 80)
+ .add("test.sink.Collector.queue.capacity", "20")
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+ final MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+ ms.start();
+ final int numThreads = 10;
+ final CollectingSink sink = new CollectingSink(numThreads);
+ ms.registerSink("Collector",
+ "Collector of values from all threads.", sink);
+ final TestSource[] sources = new TestSource[numThreads];
+ final Thread[] threads = new Thread[numThreads];
+ final String[] results = new String[numThreads];
+ final CyclicBarrier barrier1 = new CyclicBarrier(numThreads),
+ barrier2 = new CyclicBarrier(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ sources[i] = ms.register("threadSource" + i,
+ "A source of my threaded goodness.",
+ new TestSource("threadSourceRec" + i));
+ threads[i] = new Thread(new Runnable() {
+ private boolean safeAwait(int mySource, CyclicBarrier barrier) {
+ try {
+ barrier1.await(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ results[mySource] = "Interrupted";
+ return false;
+ } catch (BrokenBarrierException e) {
+ results[mySource] = "Broken Barrier";
+ return false;
+ } catch (TimeoutException e) {
+ results[mySource] = "Timed out on barrier";
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void run() {
+ int mySource = Integer.parseInt(Thread.currentThread().getName());
+ if (sink.collected[mySource].get() != 0L) {
+ results[mySource] = "Someone else collected my metric!";
+ return;
+ }
+ // There is a race between setting the source value and
+ // which thread takes a snapshot first. Set the value here
+ // before any thread starts publishing so they all start
+ // with the right value.
+ sources[mySource].g1.set(230);
+ // Wait for all the threads to come here so we can hammer
+ // the system at the same time
+ if (!safeAwait(mySource, barrier1)) return;
+ ms.publishMetricsNow();
+ // Since some other thread may have snatched my metric,
+ // I need to wait for the threads to finish before checking.
+ if (!safeAwait(mySource, barrier2)) return;
+ if (sink.collected[mySource].get() != 230L) {
+ results[mySource] = "Metric not collected!";
+ return;
+ }
+ results[mySource] = "Passed";
+ }
+ }, "" + i);
+ }
+ for (Thread t : threads)
+ t.start();
+ for (Thread t : threads)
+ t.join();
+ boolean pass = true;
+ String allResults = "";
+ for (String r : results) {
+ allResults += r + "\n";
+ pass = pass && r.equalsIgnoreCase("Passed");
+ }
+ assertTrue(allResults, pass);
+ ms.stop();
+ ms.shutdown();
+ }
+
+ private static class CollectingSink implements MetricsSink {
+ private final AtomicLong[] collected;
+
+ public CollectingSink(int capacity) {
+ collected = new AtomicLong[capacity];
+ for (int i = 0; i < capacity; i++) {
+ collected[i] = new AtomicLong();
+ }
+ }
+
+ @Override
+ public void init(SubsetConfiguration conf) {
+ }
+
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ final String prefix = "threadSourceRec";
+ if (record.name().startsWith(prefix)) {
+ final int recordNumber = Integer.parseInt(
+ record.name().substring(prefix.length()));
+ ArrayList<String> names = new ArrayList<String>();
+ for (Metric m : record.metrics()) {
+ if (m.name().equalsIgnoreCase("g1")) {
+ collected[recordNumber].set(m.value().longValue());
+ return;
+ }
+ names.add(m.name());
+ }
+ }
+ }
+
+ @Override
+ public void flush() {
+ }
+ }
+
+ @Test public void testHangingSink() {
+ new ConfigBuilder().add("*.period", 8)
+ .add("test.sink.hanging.retry.delay", "1")
+ .add("test.sink.hanging.retry.backoff", "1.01")
+ .add("test.sink.hanging.retry.count", "0")
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+ MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+ ms.start();
+ TestSource s = ms.register("s3", "s3 desc", new TestSource("s3rec"));
+ s.c1.incr();
+ HangingSink hanging = new HangingSink();
+ ms.registerSink("hanging", "Hang the sink!", hanging);
+ ms.publishMetricsNow();
+ assertFalse(hanging.getInterrupted());
+ ms.stop();
+ ms.shutdown();
+ assertTrue(hanging.getInterrupted());
+ assertTrue("The sink didn't get called after its first hang " +
+ "for subsequent records.", hanging.getGotCalledSecondTime());
+ }
+
+ private static class HangingSink implements MetricsSink {
+ private volatile boolean interrupted;
+ private boolean gotCalledSecondTime;
+ private boolean firstTime = true;
+
+ public boolean getGotCalledSecondTime() {
+ return gotCalledSecondTime;
+ }
+
+ public boolean getInterrupted() {
+ return interrupted;
+ }
+
+ @Override
+ public void init(SubsetConfiguration conf) {
+ }
+
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ // No need to hang every time, just the first record.
+ if (!firstTime) {
+ gotCalledSecondTime = true;
+ return;
+ }
+ firstTime = false;
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ }
+ }
+
+ @Override
+ public void flush() {
+ }
+ }
+
static void checkMetricsRecords(List<MetricsRecord> recs, String expected) {
LOG.debug(recs);
MetricsRecord r = recs.get(0);