You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/27 13:59:18 UTC
[1/3] flink git commit: [FLINK-4093] Expose metric interfaces
Repository: flink
Updated Branches:
refs/heads/master 62cb954d9 -> ee3c7a88b
[FLINK-4093] Expose metric interfaces
This closes #2134
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d43bf8d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d43bf8d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d43bf8d9
Branch: refs/heads/master
Commit: d43bf8d9b3085d1341bfca61e05c2a77e5426226
Parents: 62cb954
Author: zentol <ch...@apache.org>
Authored: Wed Jun 22 10:37:03 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 15:23:27 2016 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/metrics/Counter.java | 24 ++-----
.../java/org/apache/flink/metrics/Gauge.java | 5 +-
.../org/apache/flink/metrics/MetricGroup.java | 32 +++++++--
.../org/apache/flink/metrics/SimpleCounter.java | 71 ++++++++++++++++++++
.../metrics/groups/AbstractMetricGroup.java | 16 ++++-
.../groups/UnregisteredMetricsGroup.java | 19 ++++--
.../flink/runtime/taskmanager/TaskManager.scala | 41 ++++++-----
.../partition/consumer/InputChannelTest.java | 4 +-
8 files changed, 154 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
index acc37cf..ffb1cc7 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
@@ -24,48 +24,36 @@ import org.apache.flink.annotation.PublicEvolving;
* A Counter is a {@link Metric} that measures a count.
*/
@PublicEvolving
-public final class Counter implements Metric {
-
- private long count;
+public interface Counter extends Metric {
/**
* Increment the current count by 1.
*/
- public void inc() {
- count++;
- }
+ void inc();
/**
* Increment the current count by the given value.
*
* @param n value to increment the current count by
*/
- public void inc(long n) {
- count += n;
- }
+ void inc(long n);
/**
* Decrement the current count by 1.
*/
- public void dec() {
- count--;
- }
+ void dec();
/**
* Decrement the current count by the given value.
*
* @param n value to decrement the current count by
*/
- public void dec(long n) {
- count -= n;
- }
+ void dec(long n);
/**
* Returns the current count.
*
* @return current count
*/
- public long getCount() {
- return count;
- }
+ long getCount();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
index aad8deb..740645d 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
@@ -24,12 +24,11 @@ import org.apache.flink.annotation.PublicEvolving;
* A Gauge is a {@link Metric} that calculates a specific value at a point in time.
*/
@PublicEvolving
-public abstract class Gauge<T> implements Metric {
-
+public interface Gauge<T> extends Metric {
/**
* Calculates and returns the measured value.
*
* @return calculated value
*/
- public abstract T getValue();
+ T getValue();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index 6c9e044..b131949 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -63,7 +63,7 @@ public interface MetricGroup {
* Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
*
* @param name name of the counter
- * @return the registered counter
+ * @return the created counter
*/
Counter counter(int name);
@@ -71,19 +71,39 @@ public interface MetricGroup {
* Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
*
* @param name name of the counter
- * @return the registered counter
+ * @return the created counter
*/
Counter counter(String name);
/**
+ * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
+ *
+ * @param name name of the counter
+ * @param counter counter to register
+ * @param <C> counter type
+ * @return the given counter
+ */
+ <C extends Counter> C counter(int name, C counter);
+
+ /**
+ * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
+ *
+ * @param name name of the counter
+ * @param counter counter to register
+ * @param <C> counter type
+ * @return the given counter
+ */
+ <C extends Counter> C counter(String name, C counter);
+
+ /**
* Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
*
* @param name name of the gauge
* @param gauge gauge to register
* @param <T> return type of the gauge
- * @return the registered gauge
+ * @return the given gauge
*/
- <T> Gauge<T> gauge(int name, Gauge<T> gauge);
+ <T, G extends Gauge<T>> G gauge(int name, G gauge);
/**
* Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
@@ -91,9 +111,9 @@ public interface MetricGroup {
* @param name name of the gauge
* @param gauge gauge to register
* @param <T> return type of the gauge
- * @return the registered gauge
+ * @return the given gauge
*/
- <T> Gauge<T> gauge(String name, Gauge<T> gauge);
+ <T, G extends Gauge<T>> G gauge(String name, G gauge);
// ------------------------------------------------------------------------
// Groups
http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
new file mode 100644
index 0000000..9720b08
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not thread-safe.
+ */
+public class SimpleCounter implements Counter {
+ private long count;
+
+ /**
+ * Increment the current count by 1.
+ */
+ @Override
+ public void inc() {
+ count++;
+ }
+
+ /**
+ * Increment the current count by the given value.
+ *
+ * @param n value to increment the current count by
+ */
+ @Override
+ public void inc(long n) {
+ count += n;
+ }
+
+ /**
+ * Decrement the current count by 1.
+ */
+ @Override
+ public void dec() {
+ count--;
+ }
+
+ /**
+ * Decrement the current count by the given value.
+ *
+ * @param n value to decrement the current count by
+ */
+ @Override
+ public void dec(long n) {
+ count -= n;
+ }
+
+ /**
+ * Returns the current count.
+ *
+ * @return current count
+ */
+ @Override
+ public long getCount() {
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
index 032fa04..93eb734 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.scope.ScopeFormat;
import org.slf4j.Logger;
@@ -146,18 +147,27 @@ public abstract class AbstractMetricGroup implements MetricGroup {
@Override
public Counter counter(String name) {
- Counter counter = new Counter();
+ return counter(name, new SimpleCounter());
+ }
+
+ @Override
+ public <C extends Counter> C counter(int name, C counter) {
+ return counter(String.valueOf(name), counter);
+ }
+
+ @Override
+ public <C extends Counter> C counter(String name, C counter) {
addMetric(name, counter);
return counter;
}
@Override
- public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+ public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
return gauge(String.valueOf(name), gauge);
}
@Override
- public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
addMetric(name, gauge);
return gauge;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
index 961bcce..29d71d9 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
/**
* A special {@link MetricGroup} that does not register any metrics at the metrics registry
@@ -42,21 +43,31 @@ public class UnregisteredMetricsGroup implements MetricGroup {
@Override
public Counter counter(int name) {
- return new Counter();
+ return new SimpleCounter();
}
@Override
public Counter counter(String name) {
- return new Counter();
+ return new SimpleCounter();
}
@Override
- public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+ public <C extends Counter> C counter(int name, C counter) {
+ return counter;
+ }
+
+ @Override
+ public <C extends Counter> C counter(String name, C counter) {
+ return counter;
+ }
+
+ @Override
+ public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
return gauge;
}
@Override
- public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
return gauge;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8ef22af..1fb0e09 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2294,11 +2294,10 @@ object TaskManager {
private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
val mxBean = ManagementFactory.getClassLoadingMXBean
- metrics
- .gauge("ClassesLoaded", new FlinkGauge[Long] {
+ metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getTotalLoadedClassCount
})
- metrics.gauge("ClassesUnloaded", new FlinkGauge[Long] {
+ metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getUnloadedClassCount
})
}
@@ -2308,10 +2307,10 @@ object TaskManager {
for (garbageCollector <- garbageCollectors) {
val gcGroup = metrics.addGroup("\"" + garbageCollector.getName + "\"")
- gcGroup.gauge("Count", new FlinkGauge[Long] {
+ gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
override def getValue: Long = garbageCollector.getCollectionCount
})
- gcGroup.gauge("Time", new FlinkGauge[Long] {
+ gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
override def getValue: Long = garbageCollector.getCollectionTime
})
}
@@ -2320,24 +2319,24 @@ object TaskManager {
private def instantiateMemoryMetrics(metrics: MetricGroup) {
val mxBean = ManagementFactory.getMemoryMXBean
val heap = metrics.addGroup("Heap")
- heap.gauge("Used", new FlinkGauge[Long] {
+ heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
})
- heap.gauge("Committed", new FlinkGauge[Long] {
+ heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
})
- heap.gauge("Max", new FlinkGauge[Long] {
+ heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
})
val nonHeap = metrics.addGroup("NonHeap")
- nonHeap.gauge("Used", new FlinkGauge[Long] {
+ nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
})
- nonHeap.gauge("Committed", new FlinkGauge[Long] {
+ nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
})
- nonHeap.gauge("Max", new FlinkGauge[Long] {
+ nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
})
@@ -2346,15 +2345,15 @@ object TaskManager {
val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
val direct = metrics.addGroup("Direct")
- direct.gauge("Count", new FlinkGauge[Long] {
+ direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(directObjectName, "Count").asInstanceOf[Long]
})
- direct.gauge("MemoryUsed", new FlinkGauge[Long] {
+ direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
})
- direct.gauge("TotalCapacity", new FlinkGauge[Long] {
+ direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
})
@@ -2362,15 +2361,15 @@ object TaskManager {
val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
val mapped = metrics.addGroup("Mapped")
- mapped.gauge("Count", new FlinkGauge[Long] {
+ mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
})
- mapped.gauge("MemoryUsed", new FlinkGauge[Long] {
+ mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
})
- mapped.gauge("TotalCapacity", new FlinkGauge[Long] {
+ mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
override def getValue: Long = con
.getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
})
@@ -2379,8 +2378,7 @@ object TaskManager {
private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
val mxBean = ManagementFactory.getThreadMXBean
- metrics
- .gauge("Count", new FlinkGauge[Int] {
+ metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
override def getValue: Int = mxBean.getThreadCount
})
}
@@ -2390,11 +2388,10 @@ object TaskManager {
val mxBean = ManagementFactory.getOperatingSystemMXBean
.asInstanceOf[com.sun.management.OperatingSystemMXBean]
- metrics
- .gauge("Load", new FlinkGauge[Double] {
+ metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] {
override def getValue: Double = mxBean.getProcessCpuLoad
})
- metrics.gauge("Time", new FlinkGauge[Long] {
+ metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
override def getValue: Long = mxBean.getProcessCpuTime
})
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index da15f08..0868398 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -119,7 +119,7 @@ public class InputChannelTest {
ResultPartitionID partitionId,
Tuple2<Integer, Integer> initialAndMaxBackoff) {
- super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter());
+ super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new SimpleCounter());
}
@Override
[2/3] flink git commit: [FLINK-3951] Add Histogram metric type
Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
new file mode 100644
index 0000000..2479c26
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_INTERVAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
+
+ /**
+ * Tests the histogram functionality of the DropwizardHistogramWrapper.
+ */
+ @Test
+ public void testDropwizardHistogramWrapper() {
+ int size = 10;
+ DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
+ new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
+
+ for (int i = 0; i < size; i++) {
+ histogramWrapper.update(i);
+
+ assertEquals(i + 1, histogramWrapper.getCount());
+ assertEquals(i, histogramWrapper.getStatistics().getMax());
+ assertEquals(0, histogramWrapper.getStatistics().getMin());
+ }
+
+ assertEquals(size, histogramWrapper.getStatistics().size());
+ assertEquals((size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+
+ for (int i = size; i < 2 * size; i++) {
+ histogramWrapper.update(i);
+
+ assertEquals(i + 1, histogramWrapper.getCount());
+ assertEquals(i, histogramWrapper.getStatistics().getMax());
+ assertEquals(i + 1 - size, histogramWrapper.getStatistics().getMin());
+ }
+
+ assertEquals(size, histogramWrapper.getStatistics().size());
+ assertEquals(size + (size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+ }
+
+ /**
+ * Tests that the DropwizardHistogramWrapper reports correct dropwizard snapshots to the
+ * ScheduledReporter.
+ */
+ @Test
+ public void testDropwizardHistogramWrapperReporting() throws Exception {
+ long reportingInterval = 1000;
+ long timeout = 30000;
+ int size = 10;
+ String histogramMetricName = "histogram";
+ Configuration config = new Configuration();
+ config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestingReporter.class.getName());
+ config.setString(KEY_METRICS_REPORTER_INTERVAL, reportingInterval + " MILLISECONDS");
+
+ MetricRegistry registry = null;
+
+ try {
+ registry = new MetricRegistry(config);
+ DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
+ new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
+
+ TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+ metricGroup.histogram(histogramMetricName, histogramWrapper);
+
+ String fullMetricName = metricGroup.getScopeString() + "." + histogramMetricName;
+
+ Field f = registry.getClass().getDeclaredField("reporter");
+ f.setAccessible(true);
+
+ MetricReporter reporter = (MetricReporter) f.get(registry);
+
+ assertTrue(reporter instanceof TestingReporter);
+
+ TestingReporter testingReporter = (TestingReporter) reporter;
+
+ TestingScheduledReporter scheduledReporter = testingReporter.scheduledReporter;
+
+ // check that the metric has been registered
+ assertEquals(1, testingReporter.getMetrics().size());
+
+ for (int i = 0; i < size; i++) {
+ histogramWrapper.update(i);
+ }
+
+ Future<Snapshot> snapshotFuture = scheduledReporter.getNextHistogramSnapshot(fullMetricName);
+
+ Snapshot snapshot = snapshotFuture.get(timeout, TimeUnit.MILLISECONDS);
+
+ assertEquals(0, snapshot.getMin());
+ assertEquals((size - 1) / 2.0, snapshot.getMedian(), 0.001);
+ assertEquals(size - 1, snapshot.getMax());
+ assertEquals(size, snapshot.size());
+
+ registry.unregister(histogramWrapper, "histogram", metricGroup);
+
+ // check that the metric has been de-registered
+ assertEquals(0, testingReporter.getMetrics().size());
+ } finally {
+ if (registry != null) {
+ registry.shutdown();
+ }
+ }
+ }
+
+ public static class TestingReporter extends ScheduledDropwizardReporter {
+ TestingScheduledReporter scheduledReporter = null;
+
+ @Override
+ public ScheduledReporter getReporter(Configuration config) {
+ scheduledReporter = new TestingScheduledReporter(
+ registry,
+ getClass().getName(),
+ null,
+ TimeUnit.MILLISECONDS,
+ TimeUnit.MILLISECONDS);
+
+ return scheduledReporter;
+ }
+
+ public Map<String, com.codahale.metrics.Metric> getMetrics() {
+ return registry.getMetrics();
+ }
+ }
+
+ static class TestingScheduledReporter extends ScheduledReporter {
+
+ final Map<String, Snapshot> histogramSnapshots = new HashMap<>();
+ final Map<String, List<CompletableFuture<Snapshot>>> histogramSnapshotFutures = new HashMap<>();
+
+ protected TestingScheduledReporter(com.codahale.metrics.MetricRegistry registry, String name, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit) {
+ super(registry, name, filter, rateUnit, durationUnit);
+ }
+
+ @Override
+ public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, com.codahale.metrics.Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
+ for (Map.Entry<String, com.codahale.metrics.Histogram> entry: histograms.entrySet()) {
+ reportHistogram(entry.getKey(), entry.getValue());
+ }
+ }
+
+ void reportHistogram(String name, com.codahale.metrics.Histogram histogram) {
+ histogramSnapshots.put(name, histogram.getSnapshot());
+
+ synchronized (histogramSnapshotFutures) {
+ if (histogramSnapshotFutures.containsKey(name)) {
+ List<CompletableFuture<Snapshot>> futures = histogramSnapshotFutures.remove(name);
+
+ for (CompletableFuture<Snapshot> future: futures) {
+ future.complete(histogram.getSnapshot());
+ }
+ }
+ }
+ }
+
+ Future<Snapshot> getNextHistogramSnapshot(String name) {
+ synchronized (histogramSnapshotFutures) {
+ List<CompletableFuture<Snapshot>> futures;
+ if (histogramSnapshotFutures.containsKey(name)) {
+ futures = histogramSnapshotFutures.get(name);
+ } else {
+ futures = new ArrayList<>();
+ histogramSnapshotFutures.put(name, futures);
+ }
+
+ CompletableFuture<Snapshot> future = new CompletableFuture<>();
+ futures.add(future);
+
+ return future;
+ }
+ }
+ }
+
+ static class CompletableFuture<T> implements Future<T> {
+
+ private Exception exception = null;
+ private T value = null;
+
+ private Object lock = new Object();
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ synchronized (lock) {
+ if (isDone()) {
+ return false;
+ } else {
+ exception = new CancellationException("Future was cancelled.");
+
+ lock.notifyAll();
+
+ return true;
+ }
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return exception instanceof CancellationException;
+ }
+
+ @Override
+ public boolean isDone() {
+ return value != null || exception != null;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ while (!isDone() && !isCancelled()) {
+ synchronized (lock) {
+ lock.wait();
+ }
+ }
+
+ if (exception != null) {
+ throw new ExecutionException(exception);
+ } else if (value != null) {
+ return value;
+ } else {
+ throw new ExecutionException(new Exception("Future did not complete correctly."));
+ }
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long timeoutMs = unit.toMillis(timeout);
+ long timeoutEnd = timeoutMs + System.currentTimeMillis();
+
+ while (!isDone() && !isCancelled() && timeoutMs > 0) {
+ synchronized (lock) {
+ lock.wait(unit.toMillis(timeoutMs));
+ }
+
+ timeoutMs = timeoutEnd - System.currentTimeMillis();
+ }
+
+ if (exception != null) {
+ throw new ExecutionException(exception);
+ } else if (value != null) {
+ return value;
+ } else {
+ throw new ExecutionException(new Exception("Future did not complete correctly."));
+ }
+ }
+
+ public boolean complete(T value) {
+ synchronized (lock) {
+ if (!isDone()) {
+ this.value = value;
+
+ lock.notifyAll();
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public boolean fail(Exception exception) {
+ synchronized (lock) {
+ if (!isDone()) {
+ this.exception = exception;
+
+ lock.notifyAll();
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+ <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+ <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-ganglia/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/pom.xml b/flink-metrics/flink-metrics-ganglia/pom.xml
new file mode 100644
index 0000000..c4f51da
--- /dev/null
+++ b/flink-metrics/flink-metrics-ganglia/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics-ganglia</artifactId>
+ <name>flink-metrics-ganglia</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-dropwizard</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>info.ganglia.gmetric4j</groupId>
+ <artifactId>gmetric4j</artifactId>
+ <version>1.0.7</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-ganglia</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
new file mode 100644
index 0000000..adf9394
--- /dev/null
+++ b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.ganglia;
+
+import com.codahale.metrics.ScheduledReporter;
+
+import info.ganglia.gmetric4j.gmetric.GMetric;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+@PublicEvolving
+public class GangliaReporter extends ScheduledDropwizardReporter {
+
+ public static final String ARG_DMAX = "dmax";
+ public static final String ARG_TMAX = "tmax";
+ public static final String ARG_TTL = "ttl";
+ public static final String ARG_MODE_ADDRESSING = "addressingMode";
+
+ @Override
+ public ScheduledReporter getReporter(Configuration config) {
+
+ try {
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+ String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
+ int ttl = config.getInteger(ARG_TTL, -1);
+ GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
+
+ String prefix = config.getString(ARG_PREFIX, null);
+ String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
+ String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
+ int dMax = config.getInteger(ARG_DMAX, 0);
+ int tMax = config.getInteger(ARG_TMAX, 60);
+
+ com.codahale.metrics.ganglia.GangliaReporter.Builder builder =
+ com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
+
+ if (prefix != null) {
+ builder.prefixedWith(prefix);
+ }
+ if (conversionRate != null) {
+ builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+ }
+ if (conversionDuration != null) {
+ builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+ }
+ builder.withDMax(dMax);
+ builder.withTMax(tMax);
+
+ return builder.build(gMetric);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while instantiating GangliaReporter.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-graphite/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/pom.xml b/flink-metrics/flink-metrics-graphite/pom.xml
new file mode 100644
index 0000000..45fb018
--- /dev/null
+++ b/flink-metrics/flink-metrics-graphite/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics-graphite</artifactId>
+ <name>flink-metrics-graphite</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-dropwizard</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
new file mode 100644
index 0000000..16be830
--- /dev/null
+++ b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.graphite;
+
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.graphite.Graphite;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.util.concurrent.TimeUnit;
+
+@PublicEvolving
+public class GraphiteReporter extends ScheduledDropwizardReporter {
+
+ @Override
+ public ScheduledReporter getReporter(Configuration config) {
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+
+ String prefix = config.getString(ARG_PREFIX, null);
+ String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
+ String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
+
+ com.codahale.metrics.graphite.GraphiteReporter.Builder builder =
+ com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry);
+
+ if (prefix != null) {
+ builder.prefixedWith(prefix);
+ }
+
+ if (conversionRate != null) {
+ builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+ }
+
+ if (conversionDuration != null) {
+ builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+ }
+
+ return builder.build(new Graphite(host, port));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/pom.xml b/flink-metrics/flink-metrics-statsd/pom.xml
new file mode 100644
index 0000000..8ee0b56
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics-statsd</artifactId>
+ <name>flink-metrics-statsd</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
new file mode 100644
index 0000000..3d9bf07
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.statsd;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.util.ConcurrentModificationException;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Largely based on the StatsDReporter class by ReadyTalk
+ * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
+ *
+ * Ported since it was not present in maven central.
+ */
+@PublicEvolving
+public class StatsDReporter extends AbstractReporter implements Scheduled {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+// public static final String ARG_CONVERSION_RATE = "rateConversion";
+// public static final String ARG_CONVERSION_DURATION = "durationConversion";
+
+ private boolean closed = false;
+
+ private DatagramSocket socket;
+ private InetSocketAddress address;
+
+ @Override
+ public void open(Configuration config) {
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+
+ this.address = new InetSocketAddress(host, port);
+
+// String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS");
+// String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
+// this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
+// this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1);
+
+ try {
+ this.socket = new DatagramSocket(0);
+ } catch (SocketException e) {
+ throw new RuntimeException("Could not create datagram socket. ", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ if (socket != null && !socket.isClosed()) {
+ socket.close();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void report() {
+ // instead of locking here, we tolerate exceptions
+ // we do this to prevent holding the lock for very long and blocking
+ // operator creation and shutdown
+ try {
+ for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
+ if (closed) {
+ return;
+ }
+ reportGauge(entry.getValue(), entry.getKey());
+ }
+
+ for (Map.Entry<Counter, String> entry : counters.entrySet()) {
+ if (closed) {
+ return;
+ }
+ reportCounter(entry.getValue(), entry.getKey());
+ }
+
+ for (Map.Entry<Histogram, String> entry : histograms.entrySet()) {
+ reportHistogram(entry.getValue(), entry.getKey());
+ }
+ }
+ catch (ConcurrentModificationException | NoSuchElementException e) {
+ // ignore - may happen when metrics are concurrently added or removed
+ // report next time
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private void reportCounter(final String name, final Counter counter) {
+ send(name, String.valueOf(counter.getCount()));
+ }
+
+ private void reportGauge(final String name, final Gauge<?> gauge) {
+ Object value = gauge.getValue();
+ if (value != null) {
+ send(name, value.toString());
+ }
+ }
+
+ private void reportHistogram(final String name, final Histogram histogram) {
+ if (histogram != null) {
+
+ HistogramStatistics statistics = histogram.getStatistics();
+
+ if (statistics != null) {
+ send(prefix(name, "count"), String.valueOf(histogram.getCount()));
+ send(prefix(name, "max"), String.valueOf(statistics.getMax()));
+ send(prefix(name, "min"), String.valueOf(statistics.getMin()));
+ send(prefix(name, "mean"), String.valueOf(statistics.getMean()));
+ send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev()));
+ send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5)));
+ send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75)));
+ send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95)));
+ send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98)));
+ send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99)));
+ send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999)));
+ }
+ }
+ }
+
+ private String prefix(String ... names) {
+ if (names.length > 0) {
+ StringBuilder stringBuilder = new StringBuilder(names[0]);
+
+ for (int i = 1; i < names.length; i++) {
+ stringBuilder.append('.').append(names[i]);
+ }
+
+ return stringBuilder.toString();
+ } else {
+ return "";
+ }
+ }
+
+ private void send(final String name, final String value) {
+ try {
+ String formatted = String.format("%s:%s|g", name, value);
+ byte[] data = formatted.getBytes();
+ socket.send(new DatagramPacket(data, data.length, this.address));
+ }
+ catch (IOException e) {
+ LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
new file mode 100644
index 0000000..5f5ef40
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.statsd;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+
+public class StatsDReporterTest extends TestLogger {
+
+ /**
+ * Tests that histograms are properly reported via the StatsD reporter
+ */
+ @Test
+ public void testStatsDHistogramReporting() throws Exception {
+ MetricRegistry registry = null;
+ DatagramSocketReceiver receiver = null;
+ Thread receiverThread = null;
+ long timeout = 5000;
+ long joinTimeout = 30000;
+
+ String histogramName = "histogram";
+
+ try {
+ receiver = new DatagramSocketReceiver();
+
+ receiverThread = new Thread(receiver);
+
+ receiverThread.start();
+
+ int port = receiver.getPort();
+
+ Configuration config = new Configuration();
+ config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, StatsDReporter.class.getName());
+ config.setString(MetricRegistry.KEY_METRICS_REPORTER_INTERVAL, "1 SECONDS");
+ config.setString(MetricRegistry.KEY_METRICS_REPORTER_ARGUMENTS, "--host localhost --port " + port);
+
+ registry = new MetricRegistry(config);
+
+ TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+ TestingHistogram histogram = new TestingHistogram();
+
+ metricGroup.histogram(histogramName, histogram);
+
+ receiver.waitUntilNumLines(11, timeout);
+
+ Set<String> lines = receiver.getLines();
+
+ String prefix = metricGroup.getScopeString() + "." + histogramName;
+
+ Set<String> expectedLines = new HashSet<>();
+
+ expectedLines.add(prefix + ".count:1|g");
+ expectedLines.add(prefix + ".mean:3.0|g");
+ expectedLines.add(prefix + ".min:6|g");
+ expectedLines.add(prefix + ".max:5|g");
+ expectedLines.add(prefix + ".stddev:4.0|g");
+ expectedLines.add(prefix + ".p75:0.75|g");
+ expectedLines.add(prefix + ".p98:0.98|g");
+ expectedLines.add(prefix + ".p99:0.99|g");
+ expectedLines.add(prefix + ".p999:0.999|g");
+ expectedLines.add(prefix + ".p95:0.95|g");
+ expectedLines.add(prefix + ".p50:0.5|g");
+
+ assertEquals(expectedLines, lines);
+
+ } finally {
+ if (registry != null) {
+ registry.shutdown();
+ }
+
+ if (receiver != null) {
+ receiver.stop();
+ }
+
+ if (receiverThread != null) {
+ receiverThread.join(joinTimeout);
+ }
+ }
+ }
+
+ public static class TestingHistogram implements Histogram {
+
+ @Override
+ public void update(long value) {
+
+ }
+
+ @Override
+ public long getCount() {
+ return 1;
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return new HistogramStatistics() {
+ @Override
+ public double getQuantile(double quantile) {
+ return quantile;
+ }
+
+ @Override
+ public long[] getValues() {
+ return new long[0];
+ }
+
+ @Override
+ public int size() {
+ return 2;
+ }
+
+ @Override
+ public double getMean() {
+ return 3;
+ }
+
+ @Override
+ public double getStdDev() {
+ return 4;
+ }
+
+ @Override
+ public long getMax() {
+ return 5;
+ }
+
+ @Override
+ public long getMin() {
+ return 6;
+ }
+ };
+ }
+ }
+
+ public static class DatagramSocketReceiver implements Runnable {
+ private static final Object obj = new Object();
+
+ private final DatagramSocket socket;
+ private final ConcurrentHashMap<String, Object> lines;
+
+ private boolean running = true;
+
+ public DatagramSocketReceiver() throws SocketException {
+ socket = new DatagramSocket();
+ lines = new ConcurrentHashMap<>();
+ }
+
+ public int getPort() {
+ return socket.getLocalPort();
+ }
+
+ public void stop() {
+ running = false;
+ socket.close();
+ }
+
+ public void waitUntilNumLines(int numberLines, long timeout) throws TimeoutException {
+ long endTimeout = System.currentTimeMillis() + timeout;
+ long remainingTimeout = timeout;
+
+ while (numberLines > lines.size() && remainingTimeout > 0) {
+ synchronized (lines) {
+ try {
+ lines.wait(remainingTimeout);
+ } catch (InterruptedException e) {
+ // ignore interruption exceptions
+ }
+ }
+
+ remainingTimeout = endTimeout - System.currentTimeMillis();
+ }
+
+ if (remainingTimeout <= 0) {
+ throw new TimeoutException("Have not received " + numberLines + " in time.");
+ }
+ }
+
+ public Set<String> getLines() {
+ return lines.keySet();
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ byte[] buffer = new byte[1024];
+
+ DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+ socket.receive(packet);
+
+ String line = new String(packet.getData(), 0, packet.getLength());
+
+ lines.put(line, obj);
+
+ synchronized (lines) {
+ lines.notifyAll();
+ }
+ } catch (IOException ex) {
+ // ignore the exceptions
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+ <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+ <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
new file mode 100644
index 0000000..542f49c
--- /dev/null
+++ b/flink-metrics/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics</artifactId>
+ <name>flink-metrics</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>flink-metrics-dropwizard</module>
+ <module>flink-metrics-ganglia</module>
+ <module>flink-metrics-graphite</module>
+ <module>flink-metrics-statsd</module>
+ </modules>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9dc8846..9da3fa5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,7 +74,7 @@ under the License.
<module>flink-quickstart</module>
<module>flink-contrib</module>
<module>flink-dist</module>
- <module>flink-metric-reporters</module>
+ <module>flink-metrics</module>
</modules>
<properties>
[3/3] flink git commit: [FLINK-3951] Add Histogram metric type
Posted by ch...@apache.org.
[FLINK-3951] Add Histogram metric type
This closes #2112
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee3c7a88
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee3c7a88
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee3c7a88
Branch: refs/heads/master
Commit: ee3c7a88bb74232e4884899699aaa08ae2b6e038
Parents: d43bf8d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 14 19:04:43 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 15:32:03 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/metrics/Histogram.java | 52 +++
.../flink/metrics/HistogramStatistics.java | 81 +++++
.../org/apache/flink/metrics/MetricGroup.java | 20 ++
.../metrics/groups/AbstractMetricGroup.java | 12 +
.../groups/UnregisteredMetricsGroup.java | 12 +-
.../metrics/reporter/AbstractReporter.java | 15 +
.../flink/metrics/reporter/JMXReporter.java | 98 +++++-
.../flink/metrics/MetricRegistryTest.java | 3 +-
.../groups/MetricGroupRegistrationTest.java | 21 ++
.../flink/metrics/reporter/JMXReporterTest.java | 108 ++++++-
.../flink-metrics-dropwizard/pom.xml | 72 -----
.../dropwizard/ScheduledDropwizardReporter.java | 140 --------
.../dropwizard/metrics/CounterWrapper.java | 33 --
.../flink/dropwizard/metrics/GaugeWrapper.java | 41 ---
.../flink-metrics-ganglia/pom.xml | 90 ------
.../flink/metrics/ganglia/GangliaReporter.java | 79 -----
.../flink-metrics-graphite/pom.xml | 84 -----
.../metrics/graphite/GraphiteReporter.java | 63 ----
.../flink-metrics-statsd/pom.xml | 43 ---
.../flink/metrics/statsd/StatsDReporter.java | 143 ---------
flink-metric-reporters/pom.xml | 42 ---
flink-metrics/flink-metrics-dropwizard/pom.xml | 80 +++++
.../dropwizard/ScheduledDropwizardReporter.java | 162 ++++++++++
.../metrics/DropwizardHistogramStatistics.java | 70 ++++
.../metrics/DropwizardHistogramWrapper.java | 53 +++
.../dropwizard/metrics/FlinkCounterWrapper.java | 33 ++
.../dropwizard/metrics/FlinkGaugeWrapper.java | 41 +++
.../metrics/FlinkHistogramWrapper.java | 52 +++
.../metrics/HistogramStatisticsWrapper.java | 86 +++++
.../DropwizardFlinkHistogramWrapperTest.java | 319 +++++++++++++++++++
.../src/test/resources/log4j-test.properties | 27 ++
.../src/test/resources/logback-test.xml | 34 ++
flink-metrics/flink-metrics-ganglia/pom.xml | 90 ++++++
.../flink/metrics/ganglia/GangliaReporter.java | 79 +++++
flink-metrics/flink-metrics-graphite/pom.xml | 84 +++++
.../metrics/graphite/GraphiteReporter.java | 63 ++++
flink-metrics/flink-metrics-statsd/pom.xml | 51 +++
.../flink/metrics/statsd/StatsDReporter.java | 184 +++++++++++
.../metrics/statsd/StatsDReporterTest.java | 236 ++++++++++++++
.../src/test/resources/log4j-test.properties | 27 ++
.../src/test/resources/logback-test.xml | 34 ++
flink-metrics/pom.xml | 42 +++
pom.xml | 2 +-
43 files changed, 2264 insertions(+), 837 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
new file mode 100644
index 0000000..3fd1253
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Histogram interface to be used with Flink's metrics system.
+ *
+ * The histogram allows to record values, get the current count of recorded values and create
+ * histogram statistics for the currently seen elements.
+ */
+@PublicEvolving
+public interface Histogram extends Metric {
+
+ /**
+ * Update the histogram with the given value.
+ *
+ * @param value Value to update the histogram with
+ */
+ void update(long value);
+
+ /**
+ * Get the count of seen elements.
+ *
+ * @return Count of seen elements
+ */
+ long getCount();
+
+ /**
+ * Create statistics for the currently recorded elements.
+ *
+ * @return Statistics about the currently recorded elements
+ */
+ HistogramStatistics getStatistics();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
new file mode 100644
index 0000000..476580c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Histogram statistics represent the current snapshot of elements recorded in the histogram.
+ *
+ * The histogram statistics allow to calculate values for quantiles, the mean, the standard
+ * deviation, the minimum and the maximum.
+ */
+@PublicEvolving
+public abstract class HistogramStatistics {
+
+ /**
+ * Returns the value for the given quantile based on the represented histogram statistics.
+ *
+ * @param quantile Quantile to calculate the value for
+ * @return Value for the given quantile
+ */
+ public abstract double getQuantile(double quantile);
+
+ /**
+ * Returns the elements of the statistics' sample
+ *
+ * @return Elements of the statistics' sample
+ */
+ public abstract long[] getValues();
+
+ /**
+ * Returns the size of the statistics' sample
+ *
+ * @return Size of the statistics' sample
+ */
+ public abstract int size();
+
+ /**
+ * Returns the mean of the histogram values.
+ *
+ * @return Mean of the histogram values
+ */
+ public abstract double getMean();
+
+ /**
+ * Returns the standard deviation of the distribution reflected by the histogram statistics.
+ *
+ * @return Standard deviation of histogram distribution
+ */
+ public abstract double getStdDev();
+
+ /**
+ * Returns the maximum value of the histogram.
+ *
+ * @return Maximum value of the histogram
+ */
+ public abstract long getMax();
+
+ /**
+ * Returns the minimum value of the histogram.
+ *
+ * @return Minimum value of the histogram
+ */
+ public abstract long getMin();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index b131949..f46d3fc 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -115,6 +115,26 @@ public interface MetricGroup {
*/
<T, G extends Gauge<T>> G gauge(String name, G gauge);
+ /**
+ * Registers a new {@link Histogram} with Flink.
+ *
+ * @param name name of the histogram
+ * @param histogram histogram to register
+ * @param <H> histogram type
+ * @return the registered histogram
+ */
+ <H extends Histogram> H histogram(String name, H histogram);
+
+ /**
+ * Registers a new {@link Histogram} with Flink.
+ *
+ * @param name name of the histogram
+ * @param histogram histogram to register
+ * @param <H> histogram type
+ * @return the registered histogram
+ */
+ <H extends Histogram> H histogram(int name, H histogram);
+
// ------------------------------------------------------------------------
// Groups
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
index 93eb734..112957e 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
@@ -172,6 +173,17 @@ public abstract class AbstractMetricGroup implements MetricGroup {
return gauge;
}
+ @Override
+ public <H extends Histogram> H histogram(int name, H histogram) {
+ return histogram(String.valueOf(name), histogram);
+ }
+
+ @Override
+ public <H extends Histogram> H histogram(String name, H histogram) {
+ addMetric(name, histogram);
+ return histogram;
+ }
+
/**
* Adds the given metric to the group and registers it at the registry, if the group
* is not yet closed, and if no metric with the same name has been registered before.
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
index 29d71d9..8e183df 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.groups;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
@@ -71,7 +72,16 @@ public class UnregisteredMetricsGroup implements MetricGroup {
return gauge;
}
-
+ @Override
+ public <H extends Histogram> H histogram(int name, H histogram) {
+ return histogram;
+ }
+
+ @Override
+ public <H extends Histogram> H histogram(String name, H histogram) {
+ return histogram;
+ }
+
@Override
public MetricGroup addGroup(int name) {
return addGroup(String.valueOf(name));
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
index f2e78bf..8dacb7c 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
@@ -21,8 +21,11 @@ package org.apache.flink.metrics.reporter;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -32,9 +35,11 @@ import java.util.Map;
*/
@PublicEvolving
public abstract class AbstractReporter implements MetricReporter {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
protected final Map<Gauge<?>, String> gauges = new HashMap<>();
protected final Map<Counter, String> counters = new HashMap<>();
+ protected final Map<Histogram, String> histograms = new HashMap<>();
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
@@ -45,6 +50,11 @@ public abstract class AbstractReporter implements MetricReporter {
counters.put((Counter) metric, name);
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, name);
+ } else if (metric instanceof Histogram) {
+ histograms.put((Histogram) metric, name);
+ } else {
+ log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
+ "does not support this metric type.", metric.getClass().getName());
}
}
}
@@ -56,6 +66,11 @@ public abstract class AbstractReporter implements MetricReporter {
counters.remove(metric);
} else if (metric instanceof Gauge) {
gauges.remove(metric);
+ } else if (metric instanceof Histogram) {
+ histograms.remove(metric);
+ } else {
+ log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
+ "does not support this metric type.", metric.getClass().getName());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
index 326d6d7..eaf0ea0 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.util.NetUtils;
@@ -146,8 +147,11 @@ public class JMXReporter implements MetricReporter {
jmxMetric = new JmxGauge((Gauge<?>) metric);
} else if (metric instanceof Counter) {
jmxMetric = new JmxCounter((Counter) metric);
+ } else if (metric instanceof Histogram) {
+ jmxMetric = new JmxHistogram((Histogram) metric);
} else {
- LOG.error("Unknown metric type: " + metric.getClass().getName());
+ LOG.error("Cannot add unknown metric type: {}. This indicates that the metric type " +
+ "is not supported by this reporter.", metric.getClass().getName());
return;
}
@@ -285,7 +289,7 @@ public class JMXReporter implements MetricReporter {
private static class JmxCounter extends AbstractBean implements JmxCounterMBean {
private Counter counter;
- public JmxCounter(Counter counter) {
+ JmxCounter(Counter counter) {
this.counter = counter;
}
@@ -303,7 +307,7 @@ public class JMXReporter implements MetricReporter {
private final Gauge<?> gauge;
- public JmxGauge(Gauge<?> gauge) {
+ JmxGauge(Gauge<?> gauge) {
this.gauge = gauge;
}
@@ -313,6 +317,94 @@ public class JMXReporter implements MetricReporter {
}
}
+ public interface JmxHistogramMBean extends MetricMBean {
+ long getCount();
+
+ double getMean();
+
+ double getStdDev();
+
+ long getMax();
+
+ long getMin();
+
+ double getMedian();
+
+ double get75thPercentile();
+
+ double get95thPercentile();
+
+ double get98thPercentile();
+
+ double get99thPercentile();
+
+ double get999thPercentile();
+ }
+
+ private static class JmxHistogram extends AbstractBean implements JmxHistogramMBean {
+
+ private final Histogram histogram;
+
+ JmxHistogram(Histogram histogram) {
+ this.histogram = histogram;
+ }
+
+ @Override
+ public long getCount() {
+ return histogram.getCount();
+ }
+
+ @Override
+ public double getMean() {
+ return histogram.getStatistics().getMean();
+ }
+
+ @Override
+ public double getStdDev() {
+ return histogram.getStatistics().getStdDev();
+ }
+
+ @Override
+ public long getMax() {
+ return histogram.getStatistics().getMax();
+ }
+
+ @Override
+ public long getMin() {
+ return histogram.getStatistics().getMin();
+ }
+
+ @Override
+ public double getMedian() {
+ return histogram.getStatistics().getQuantile(0.5);
+ }
+
+ @Override
+ public double get75thPercentile() {
+ return histogram.getStatistics().getQuantile(0.75);
+ }
+
+ @Override
+ public double get95thPercentile() {
+ return histogram.getStatistics().getQuantile(0.95);
+ }
+
+ @Override
+ public double get98thPercentile() {
+ return histogram.getStatistics().getQuantile(0.98);
+ }
+
+ @Override
+ public double get99thPercentile() {
+ return histogram.getStatistics().getQuantile(0.99);
+ }
+
+ @Override
+ public double get999thPercentile() {
+ return histogram.getStatistics().getQuantile(0.999);
+ }
+ }
+
/**
* JMX Server implementation that JMX clients can connect to.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
index f8e0bf5..8b71816 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
@@ -25,12 +25,13 @@ import org.apache.flink.metrics.groups.scope.ScopeFormats;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
-public class MetricRegistryTest {
+public class MetricRegistryTest extends TestLogger {
/**
* Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
index 7b35d91..c7a112a 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.metrics.groups;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
@@ -57,6 +59,25 @@ public class MetricGroupRegistrationTest {
Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
assertEquals("gauge", TestReporter1.lastPassedName);
+ Histogram histogram = root.histogram("histogram", new Histogram() {
+ @Override
+ public void update(long value) {
+
+ }
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return null;
+ }
+ });
+
+ Assert.assertEquals(histogram, TestReporter1.lastPassedMetric);
+ assertEquals("histogram", TestReporter1.lastPassedName);
registry.shutdown();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
index d25f744..9e638a7 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
@@ -20,11 +20,16 @@ package org.apache.flink.metrics.reporter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
@@ -37,7 +42,7 @@ import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT;
import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_CLASS;
import static org.junit.Assert.assertEquals;
-public class JMXReporterTest {
+public class JMXReporterTest extends TestLogger {
@Test
public void testReplaceInvalidChars() {
@@ -188,4 +193,105 @@ public class JMXReporterTest {
rep2.close();
reg.shutdown();
}
+
+ /**
+ * Tests that histograms are properly reported via the JMXReporter.
+ */
+ @Test
+ public void testHistogramReporting() throws Exception {
+ MetricRegistry registry = null;
+ String histogramName = "histogram";
+
+ try {
+ Configuration config = new Configuration();
+
+ registry = new MetricRegistry(config);
+
+ TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+ TestingHistogram histogram = new TestingHistogram();
+
+ metricGroup.histogram(histogramName, histogram);
+
+ MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(histogramName, metricGroup.getScopeComponents()));
+
+ MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
+
+ MBeanAttributeInfo[] attributeInfos = info.getAttributes();
+
+ assertEquals(11, attributeInfos.length);
+
+ assertEquals(histogram.getCount(), mBeanServer.getAttribute(objectName, "Count"));
+ assertEquals(histogram.getStatistics().getMean(), mBeanServer.getAttribute(objectName, "Mean"));
+ assertEquals(histogram.getStatistics().getStdDev(), mBeanServer.getAttribute(objectName, "StdDev"));
+ assertEquals(histogram.getStatistics().getMax(), mBeanServer.getAttribute(objectName, "Max"));
+ assertEquals(histogram.getStatistics().getMin(), mBeanServer.getAttribute(objectName, "Min"));
+ assertEquals(histogram.getStatistics().getQuantile(0.5), mBeanServer.getAttribute(objectName, "Median"));
+ assertEquals(histogram.getStatistics().getQuantile(0.75), mBeanServer.getAttribute(objectName, "75thPercentile"));
+ assertEquals(histogram.getStatistics().getQuantile(0.95), mBeanServer.getAttribute(objectName, "95thPercentile"));
+ assertEquals(histogram.getStatistics().getQuantile(0.98), mBeanServer.getAttribute(objectName, "98thPercentile"));
+ assertEquals(histogram.getStatistics().getQuantile(0.99), mBeanServer.getAttribute(objectName, "99thPercentile"));
+ assertEquals(histogram.getStatistics().getQuantile(0.999), mBeanServer.getAttribute(objectName, "999thPercentile"));
+
+ } finally {
+ if (registry != null) {
+ registry.shutdown();
+ }
+ }
+ }
+
+ static class TestingHistogram implements Histogram {
+
+ @Override
+ public void update(long value) {
+
+ }
+
+ @Override
+ public long getCount() {
+ return 1;
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return new HistogramStatistics() {
+ @Override
+ public double getQuantile(double quantile) {
+ return quantile;
+ }
+
+ @Override
+ public long[] getValues() {
+ return new long[0];
+ }
+
+ @Override
+ public int size() {
+ return 3;
+ }
+
+ @Override
+ public double getMean() {
+ return 4;
+ }
+
+ @Override
+ public double getStdDev() {
+ return 5;
+ }
+
+ @Override
+ public long getMax() {
+ return 6;
+ }
+
+ @Override
+ public long getMin() {
+ return 7;
+ }
+ };
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml b/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
deleted file mode 100644
index a386880..0000000
--- a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metric-reporters</artifactId>
- <version>1.1-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-metrics-dropwizard</artifactId>
- <name>flink-metrics-dropwizard</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>io.dropwizard.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>${metrics.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.4</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
deleted file mode 100644
index d67f3e3..0000000
--- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.dropwizard;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
-import com.codahale.metrics.ScheduledReporter;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.metrics.CounterWrapper;
-import org.apache.flink.dropwizard.metrics.GaugeWrapper;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedMap;
-
-/**
- * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a
- * Dropwizard {@link com.codahale.metrics.Reporter}.
- */
-@PublicEvolving
-public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled, Reporter {
-
- public static final String ARG_HOST = "host";
- public static final String ARG_PORT = "port";
- public static final String ARG_PREFIX = "prefix";
- public static final String ARG_CONVERSION_RATE = "rateConversion";
- public static final String ARG_CONVERSION_DURATION = "durationConversion";
-
- // ------------------------------------------------------------------------
-
- protected final MetricRegistry registry;
-
- protected ScheduledReporter reporter;
-
- private final Map<Gauge<?>, String> gauges = new HashMap<>();
- private final Map<Counter, String> counters = new HashMap<>();
-
- // ------------------------------------------------------------------------
-
- protected ScheduledDropwizardReporter() {
- this.registry = new MetricRegistry();
- }
-
- // ------------------------------------------------------------------------
- // life cycle
- // ------------------------------------------------------------------------
-
- @Override
- public void open(Configuration config) {
- this.reporter = getReporter(config);
- }
-
- @Override
- public void close() {
- this.reporter.stop();
- }
-
- // ------------------------------------------------------------------------
- // adding / removing metrics
- // ------------------------------------------------------------------------
-
- @Override
- public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
- final String fullName = group.getScopeString() + '.' + metricName;
-
- synchronized (this) {
- if (metric instanceof Counter) {
- counters.put((Counter) metric, fullName);
- registry.register(fullName, new CounterWrapper((Counter) metric));
- }
- else if (metric instanceof Gauge) {
- gauges.put((Gauge<?>) metric, fullName);
- registry.register(fullName, GaugeWrapper.fromGauge((Gauge<?>) metric));
- }
- }
- }
-
- @Override
- public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
- synchronized (this) {
- String fullName;
-
- if (metric instanceof Counter) {
- fullName = counters.remove(metric);
- } else if (metric instanceof Gauge) {
- fullName = gauges.remove(metric);
- } else {
- fullName = null;
- }
-
- if (fullName != null) {
- registry.remove(fullName);
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // scheduled reporting
- // ------------------------------------------------------------------------
-
- @Override
- public void report() {
- // we do not need to lock here, because the dropwizard registry is
- // internally a concurrent map
- @SuppressWarnings("rawtypes")
- final SortedMap<String, com.codahale.metrics.Gauge> gauges = registry.getGauges();
- final SortedMap<String, com.codahale.metrics.Counter> counters = registry.getCounters();
- final SortedMap<String, com.codahale.metrics.Histogram> histograms = registry.getHistograms();
- final SortedMap<String, com.codahale.metrics.Meter> meters = registry.getMeters();
- final SortedMap<String, com.codahale.metrics.Timer> timers = registry.getTimers();
-
- this.reporter.report(gauges, counters, histograms, meters, timers);
- }
-
- public abstract ScheduledReporter getReporter(Configuration config);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
deleted file mode 100644
index f6630b9..0000000
--- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.dropwizard.metrics;
-
-import org.apache.flink.metrics.Counter;
-
-public class CounterWrapper extends com.codahale.metrics.Counter {
- private final Counter counter;
-
- public CounterWrapper(Counter counter) {
- this.counter = counter;
- }
-
- @Override
- public long getCount() {
- return this.counter.getCount();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
deleted file mode 100644
index 655cd60..0000000
--- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.dropwizard.metrics;
-
-import org.apache.flink.metrics.Gauge;
-
-public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
-
- private final Gauge<T> gauge;
-
- public GaugeWrapper(Gauge<T> gauge) {
- this.gauge = gauge;
- }
-
- @Override
- public T getValue() {
- return this.gauge.getValue();
- }
-
- public static <T> GaugeWrapper<T> fromGauge(Gauge<?> gauge) {
- @SuppressWarnings("unchecked")
- Gauge<T> typedGauge = (Gauge<T>) gauge;
- return new GaugeWrapper<>(typedGauge);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-ganglia/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-ganglia/pom.xml b/flink-metric-reporters/flink-metrics-ganglia/pom.xml
deleted file mode 100644
index a457ca1..0000000
--- a/flink-metric-reporters/flink-metrics-ganglia/pom.xml
+++ /dev/null
@@ -1,90 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metric-reporters</artifactId>
- <version>1.1-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-metrics-ganglia</artifactId>
- <name>flink-metrics-ganglia</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metrics-dropwizard</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>info.ganglia.gmetric4j</groupId>
- <artifactId>gmetric4j</artifactId>
- <version>1.0.7</version>
- </dependency>
-
- <dependency>
- <groupId>io.dropwizard.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>${metrics.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.dropwizard.metrics</groupId>
- <artifactId>metrics-ganglia</artifactId>
- <version>${metrics.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.4</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
deleted file mode 100644
index adf9394..0000000
--- a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.ganglia;
-
-import com.codahale.metrics.ScheduledReporter;
-
-import info.ganglia.gmetric4j.gmetric.GMetric;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-@PublicEvolving
-public class GangliaReporter extends ScheduledDropwizardReporter {
-
- public static final String ARG_DMAX = "dmax";
- public static final String ARG_TMAX = "tmax";
- public static final String ARG_TTL = "ttl";
- public static final String ARG_MODE_ADDRESSING = "addressingMode";
-
- @Override
- public ScheduledReporter getReporter(Configuration config) {
-
- try {
- String host = config.getString(ARG_HOST, null);
- int port = config.getInteger(ARG_PORT, -1);
- if (host == null || host.length() == 0 || port < 1) {
- throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
- }
- String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
- int ttl = config.getInteger(ARG_TTL, -1);
- GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
-
- String prefix = config.getString(ARG_PREFIX, null);
- String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
- String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
- int dMax = config.getInteger(ARG_DMAX, 0);
- int tMax = config.getInteger(ARG_TMAX, 60);
-
- com.codahale.metrics.ganglia.GangliaReporter.Builder builder =
- com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
-
- if (prefix != null) {
- builder.prefixedWith(prefix);
- }
- if (conversionRate != null) {
- builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
- }
- if (conversionDuration != null) {
- builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
- }
- builder.withDMax(dMax);
- builder.withTMax(tMax);
-
- return builder.build(gMetric);
- } catch (IOException e) {
- throw new RuntimeException("Error while instantiating GangliaReporter.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-graphite/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-graphite/pom.xml b/flink-metric-reporters/flink-metrics-graphite/pom.xml
deleted file mode 100644
index 714b77f..0000000
--- a/flink-metric-reporters/flink-metrics-graphite/pom.xml
+++ /dev/null
@@ -1,84 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metric-reporters</artifactId>
- <version>1.1-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-metrics-graphite</artifactId>
- <name>flink-metrics-graphite</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metrics-dropwizard</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.dropwizard.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>${metrics.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.dropwizard.metrics</groupId>
- <artifactId>metrics-graphite</artifactId>
- <version>${metrics.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.4</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
deleted file mode 100644
index 16be830..0000000
--- a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.graphite;
-
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.graphite.Graphite;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
-
-import java.util.concurrent.TimeUnit;
-
-@PublicEvolving
-public class GraphiteReporter extends ScheduledDropwizardReporter {
-
- @Override
- public ScheduledReporter getReporter(Configuration config) {
- String host = config.getString(ARG_HOST, null);
- int port = config.getInteger(ARG_PORT, -1);
-
- if (host == null || host.length() == 0 || port < 1) {
- throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
- }
-
- String prefix = config.getString(ARG_PREFIX, null);
- String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
- String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
-
- com.codahale.metrics.graphite.GraphiteReporter.Builder builder =
- com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry);
-
- if (prefix != null) {
- builder.prefixedWith(prefix);
- }
-
- if (conversionRate != null) {
- builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
- }
-
- if (conversionDuration != null) {
- builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
- }
-
- return builder.build(new Graphite(host, port));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-statsd/pom.xml b/flink-metric-reporters/flink-metrics-statsd/pom.xml
deleted file mode 100644
index 3052a10..0000000
--- a/flink-metric-reporters/flink-metrics-statsd/pom.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metric-reporters</artifactId>
- <version>1.1-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-metrics-statsd</artifactId>
- <name>flink-metrics-statsd</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
deleted file mode 100644
index 087a265..0000000
--- a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.statsd;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.reporter.AbstractReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.util.ConcurrentModificationException;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/**
- * Largely based on the StatsDReporter class by ReadyTalk
- * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
- *
- * Ported since it was not present in maven central.
- */
-@PublicEvolving
-public class StatsDReporter extends AbstractReporter implements Scheduled {
-
- private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class);
-
- public static final String ARG_HOST = "host";
- public static final String ARG_PORT = "port";
-// public static final String ARG_CONVERSION_RATE = "rateConversion";
-// public static final String ARG_CONVERSION_DURATION = "durationConversion";
-
- private boolean closed = false;
-
- private DatagramSocket socket;
- private InetSocketAddress address;
-
- @Override
- public void open(Configuration config) {
- String host = config.getString(ARG_HOST, null);
- int port = config.getInteger(ARG_PORT, -1);
-
- if (host == null || host.length() == 0 || port < 1) {
- throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
- }
-
- this.address = new InetSocketAddress(host, port);
-
-// String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS");
-// String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
-// this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
-// this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1);
-
- try {
- this.socket = new DatagramSocket(0);
- } catch (SocketException e) {
- throw new RuntimeException("Could not create datagram socket. ", e);
- }
- }
-
- @Override
- public void close() {
- closed = true;
- if (socket != null && !socket.isClosed()) {
- socket.close();
- }
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void report() {
- // instead of locking here, we tolerate exceptions
- // we do this to prevent holding the lock for very long and blocking
- // operator creation and shutdown
- try {
- for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
- if (closed) {
- return;
- }
- reportGauge(entry.getValue(), entry.getKey());
- }
-
- for (Map.Entry<Counter, String> entry : counters.entrySet()) {
- if (closed) {
- return;
- }
- reportCounter(entry.getValue(), entry.getKey());
- }
- }
- catch (ConcurrentModificationException | NoSuchElementException e) {
- // ignore - may happen when metrics are concurrently added or removed
- // report next time
- }
- }
-
- // ------------------------------------------------------------------------
-
- private void reportCounter(final String name, final Counter counter) {
- send(name, String.valueOf(counter.getCount()));
- }
-
- private void reportGauge(final String name, final Gauge<?> gauge) {
- Object value = gauge.getValue();
- if (value != null) {
- send(name, value.toString());
- }
- }
-
- private void send(final String name, final String value) {
- try {
- String formatted = String.format("%s:%s|g", name, value);
- byte[] data = formatted.getBytes();
- socket.send(new DatagramPacket(data, data.length, this.address));
- }
- catch (IOException e) {
- LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/pom.xml b/flink-metric-reporters/pom.xml
deleted file mode 100644
index 01a809c..0000000
--- a/flink-metric-reporters/pom.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-parent</artifactId>
- <version>1.1-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-metric-reporters</artifactId>
- <name>flink-metric-reporters</name>
- <packaging>pom</packaging>
-
- <modules>
- <module>flink-metrics-dropwizard</module>
- <module>flink-metrics-ganglia</module>
- <module>flink-metrics-graphite</module>
- <module>flink-metrics-statsd</module>
- </modules>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml b/flink-metrics/flink-metrics-dropwizard/pom.xml
new file mode 100644
index 0000000..90dbc00
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics-dropwizard</artifactId>
+ <name>flink-metrics-dropwizard</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
new file mode 100644
index 0000000..062bbd8
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+import com.codahale.metrics.ScheduledReporter;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
+import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+/**
+ * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a
+ * Dropwizard {@link com.codahale.metrics.Reporter}.
+ */
+@PublicEvolving
+public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled, Reporter {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+ public static final String ARG_PREFIX = "prefix";
+ public static final String ARG_CONVERSION_RATE = "rateConversion";
+ public static final String ARG_CONVERSION_DURATION = "durationConversion";
+
+ // ------------------------------------------------------------------------
+
+ protected final MetricRegistry registry;
+
+ protected ScheduledReporter reporter;
+
+ private final Map<Gauge<?>, String> gauges = new HashMap<>();
+ private final Map<Counter, String> counters = new HashMap<>();
+ private final Map<Histogram, String> histograms = new HashMap<>();
+
+ // ------------------------------------------------------------------------
+
+ protected ScheduledDropwizardReporter() {
+ this.registry = new MetricRegistry();
+ }
+
+ // ------------------------------------------------------------------------
+ // life cycle
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration config) {
+ this.reporter = getReporter(config);
+ }
+
+ @Override
+ public void close() {
+ this.reporter.stop();
+ }
+
+ // ------------------------------------------------------------------------
+ // adding / removing metrics
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
+ final String fullName = group.getScopeString() + '.' + metricName;
+
+ synchronized (this) {
+ if (metric instanceof Counter) {
+ counters.put((Counter) metric, fullName);
+ registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
+ }
+ else if (metric instanceof Gauge) {
+ gauges.put((Gauge<?>) metric, fullName);
+ registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
+ } else if (metric instanceof Histogram) {
+ Histogram histogram = (Histogram) metric;
+ histograms.put(histogram, fullName);
+
+ if (histogram instanceof DropwizardHistogramWrapper) {
+ registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizarHistogram());
+ } else {
+ registry.register(fullName, new FlinkHistogramWrapper(histogram));
+ }
+ } else {
+ log.warn("Cannot add metric of type {}. This indicates that the reporter " +
+ "does not support this metric type.", metric.getClass().getName());
+ }
+ }
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
+ synchronized (this) {
+ String fullName;
+
+ if (metric instanceof Counter) {
+ fullName = counters.remove(metric);
+ } else if (metric instanceof Gauge) {
+ fullName = gauges.remove(metric);
+ } else if (metric instanceof Histogram) {
+ fullName = histograms.remove(metric);
+ } else {
+ fullName = null;
+ }
+
+ if (fullName != null) {
+ registry.remove(fullName);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // scheduled reporting
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void report() {
+ // we do not need to lock here, because the dropwizard registry is
+ // internally a concurrent map
+ @SuppressWarnings("rawtypes")
+ final SortedMap<String, com.codahale.metrics.Gauge> gauges = registry.getGauges();
+ final SortedMap<String, com.codahale.metrics.Counter> counters = registry.getCounters();
+ final SortedMap<String, com.codahale.metrics.Histogram> histograms = registry.getHistograms();
+ final SortedMap<String, com.codahale.metrics.Meter> meters = registry.getMeters();
+ final SortedMap<String, com.codahale.metrics.Timer> timers = registry.getTimers();
+
+ this.reporter.report(gauges, counters, histograms, meters, timers);
+ }
+
+ public abstract ScheduledReporter getReporter(Configuration config);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
new file mode 100644
index 0000000..6f4eab2
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Snapshot;
+import org.apache.flink.metrics.HistogramStatistics;
+
+/**
+ * Dropwizard histogram statistics implementation returned by {@link DropwizardHistogramWrapper}.
+ * The statistics class wraps a {@link Snapshot} instance and forwards the method calls accordingly.
+ */
+class DropwizardHistogramStatistics extends HistogramStatistics {
+
+ private final com.codahale.metrics.Snapshot snapshot;
+
+ DropwizardHistogramStatistics(com.codahale.metrics.Snapshot snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ public double getQuantile(double quantile) {
+ return snapshot.getValue(quantile);
+ }
+
+ @Override
+ public long[] getValues() {
+ return snapshot.getValues();
+ }
+
+ @Override
+ public int size() {
+ return snapshot.size();
+ }
+
+ @Override
+ public double getMean() {
+ return snapshot.getMean();
+ }
+
+ @Override
+ public double getStdDev() {
+ return snapshot.getStdDev();
+ }
+
+ @Override
+ public long getMax() {
+ return snapshot.getMax();
+ }
+
+ @Override
+ public long getMin() {
+ return snapshot.getMin();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
new file mode 100644
index 0000000..79a6a56
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+/**
+ * Wrapper to use a Dropwizard {@link com.codahale.metrics.Histogram} as a Flink {@link Histogram}.
+ */
+public class DropwizardHistogramWrapper implements Histogram {
+
+ private final com.codahale.metrics.Histogram dropwizarHistogram;
+
+ public DropwizardHistogramWrapper(com.codahale.metrics.Histogram dropwizardHistogram) {
+ this.dropwizarHistogram = dropwizardHistogram;
+ }
+
+ public com.codahale.metrics.Histogram getDropwizarHistogram() {
+ return dropwizarHistogram;
+ }
+
+ @Override
+ public void update(long value) {
+ dropwizarHistogram.update(value);
+ }
+
+ @Override
+ public long getCount() {
+ return dropwizarHistogram.getCount();
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return new DropwizardHistogramStatistics(dropwizarHistogram.getSnapshot());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
new file mode 100644
index 0000000..a44c3f5
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+public class FlinkCounterWrapper extends com.codahale.metrics.Counter {
+ private final Counter counter;
+
+ public FlinkCounterWrapper(Counter counter) {
+ this.counter = counter;
+ }
+
+ @Override
+ public long getCount() {
+ return this.counter.getCount();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
new file mode 100644
index 0000000..058ecad
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+public class FlinkGaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
+
+ private final Gauge<T> gauge;
+
+ public FlinkGaugeWrapper(Gauge<T> gauge) {
+ this.gauge = gauge;
+ }
+
+ @Override
+ public T getValue() {
+ return this.gauge.getValue();
+ }
+
+ public static <T> FlinkGaugeWrapper<T> fromGauge(Gauge<?> gauge) {
+ @SuppressWarnings("unchecked")
+ Gauge<T> typedGauge = (Gauge<T>) gauge;
+ return new FlinkGaugeWrapper<>(typedGauge);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
new file mode 100644
index 0000000..8bd8078
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Snapshot;
+import org.apache.flink.metrics.Histogram;
+
+/**
+ * Wrapper to use a Flink {@link Histogram} as a Dropwizard {@link com.codahale.metrics.Histogram}.
+ * This is necessary to report Flink's histograms via the Dropwizard
+ * {@link com.codahale.metrics.Reporter}.
+ */
+public class FlinkHistogramWrapper extends com.codahale.metrics.Histogram {
+
+ private final Histogram histogram;
+
+ public FlinkHistogramWrapper(Histogram histogram) {
+ super(null);
+ this.histogram = histogram;
+ }
+
+ @Override
+ public void update(long value) {
+ histogram.update(value);
+ }
+
+ @Override
+ public long getCount() {
+ return histogram.getCount();
+ }
+
+ @Override
+ public Snapshot getSnapshot() {
+ return new HistogramStatisticsWrapper(histogram.getStatistics());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
new file mode 100644
index 0000000..6d3a69b
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Snapshot;
+import org.apache.flink.metrics.HistogramStatistics;
+
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+
+/**
+ * Wrapper to use Flink's {@link HistogramStatistics} as a Dropwizard {@link Snapshot}. This is
+ * necessary to report Flink's histograms via the Dropwizard {@link com.codahale.metrics.Reporter}.
+ */
+class HistogramStatisticsWrapper extends Snapshot {
+
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ private final HistogramStatistics histogramStatistics;
+
+ HistogramStatisticsWrapper(HistogramStatistics histogramStatistics) {
+ this.histogramStatistics = histogramStatistics;
+ }
+ @Override
+ public double getValue(double quantile) {
+ return histogramStatistics.getQuantile(quantile);
+ }
+
+ @Override
+ public long[] getValues() {
+ return histogramStatistics.getValues();
+ }
+
+ @Override
+ public int size() {
+ return histogramStatistics.size();
+ }
+
+ @Override
+ public long getMax() {
+ return histogramStatistics.getMax();
+ }
+
+ @Override
+ public double getMean() {
+ return histogramStatistics.getMean();
+ }
+
+ @Override
+ public long getMin() {
+ return histogramStatistics.getMin();
+ }
+
+ @Override
+ public double getStdDev() {
+ return histogramStatistics.getStdDev();
+ }
+
+ @Override
+ public void dump(OutputStream output) {
+ try(PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(output, UTF_8))){
+
+ for (Long value : histogramStatistics.getValues()) {
+ printWriter.printf("%d%n", value);
+ }
+ }
+ }
+}