You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/27 13:59:18 UTC

[1/3] flink git commit: [FLINK-4093] Expose metric interfaces

Repository: flink
Updated Branches:
  refs/heads/master 62cb954d9 -> ee3c7a88b


[FLINK-4093] Expose metric interfaces

This closes #2134


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d43bf8d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d43bf8d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d43bf8d9

Branch: refs/heads/master
Commit: d43bf8d9b3085d1341bfca61e05c2a77e5426226
Parents: 62cb954
Author: zentol <ch...@apache.org>
Authored: Wed Jun 22 10:37:03 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 15:23:27 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/metrics/Counter.java  | 24 ++-----
 .../java/org/apache/flink/metrics/Gauge.java    |  5 +-
 .../org/apache/flink/metrics/MetricGroup.java   | 32 +++++++--
 .../org/apache/flink/metrics/SimpleCounter.java | 71 ++++++++++++++++++++
 .../metrics/groups/AbstractMetricGroup.java     | 16 ++++-
 .../groups/UnregisteredMetricsGroup.java        | 19 ++++--
 .../flink/runtime/taskmanager/TaskManager.scala | 41 ++++++-----
 .../partition/consumer/InputChannelTest.java    |  4 +-
 8 files changed, 154 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
index acc37cf..ffb1cc7 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
@@ -24,48 +24,36 @@ import org.apache.flink.annotation.PublicEvolving;
  * A Counter is a {@link Metric} that measures a count.
  */
 @PublicEvolving
-public final class Counter implements Metric {
-
-	private long count;
+public interface Counter extends Metric {
 
 	/**
 	 * Increment the current count by 1.
 	 */
-	public void inc() {
-		count++;
-	}
+	void inc();
 
 	/**
 	 * Increment the current count by the given value.
 	 *
 	 * @param n value to increment the current count by
 	 */
-	public void inc(long n) {
-		count += n;
-	}
+	void inc(long n);
 
 	/**
 	 * Decrement the current count by 1.
 	 */
-	public void dec() {
-		count--;
-	}
+	void dec();
 
 	/**
 	 * Decrement the current count by the given value.
 	 *
 	 * @param n value to decrement the current count by
 	 */
-	public void dec(long n) {
-		count -= n;
-	}
+	void dec(long n);
 
 	/**
 	 * Returns the current count.
 	 *
 	 * @return current count
 	 */
-	public long getCount() {
-		return count;
-	}
+	long getCount();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
index aad8deb..740645d 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
@@ -24,12 +24,11 @@ import org.apache.flink.annotation.PublicEvolving;
  * A Gauge is a {@link Metric} that calculates a specific value at a point in time.
  */
 @PublicEvolving
-public abstract class Gauge<T> implements Metric {
-
+public interface Gauge<T> extends Metric {
 	/**
 	 * Calculates and returns the measured value.
 	 *
 	 * @return calculated value
 	 */
-	public abstract T getValue();
+	T getValue();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index 6c9e044..b131949 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -63,7 +63,7 @@ public interface MetricGroup {
 	 * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
 	 *
 	 * @param name name of the counter
-	 * @return the registered counter
+	 * @return the created counter
 	 */
 	Counter counter(int name);
 
@@ -71,19 +71,39 @@ public interface MetricGroup {
 	 * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
 	 *
 	 * @param name name of the counter
-	 * @return the registered counter
+	 * @return the created counter
 	 */
 	Counter counter(String name);
 
 	/**
+	 * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
+	 *
+	 * @param name    name of the counter
+	 * @param counter counter to register
+	 * @param <C>     counter type
+	 * @return the given counter
+	 */
+	<C extends Counter> C counter(int name, C counter);
+
+	/**
+	 * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
+	 *
+	 * @param name    name of the counter
+	 * @param counter counter to register
+	 * @param <C>     counter type
+	 * @return the given counter
+	 */
+	<C extends Counter> C counter(String name, C counter);
+	
+	/**
 	 * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
 	 *
 	 * @param name  name of the gauge
 	 * @param gauge gauge to register
 	 * @param <T>   return type of the gauge
-	 * @return the registered gauge
+	 * @return the given gauge
 	 */
-	<T> Gauge<T> gauge(int name, Gauge<T> gauge);
+	<T, G extends Gauge<T>> G gauge(int name, G gauge);
 
 	/**
 	 * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
@@ -91,9 +111,9 @@ public interface MetricGroup {
 	 * @param name  name of the gauge
 	 * @param gauge gauge to register
 	 * @param <T>   return type of the gauge
-	 * @return the registered gauge
+	 * @return the given gauge
 	 */
-	<T> Gauge<T> gauge(String name, Gauge<T> gauge);
+	<T, G extends Gauge<T>> G gauge(String name, G gauge);
 
 	// ------------------------------------------------------------------------
 	// Groups

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
new file mode 100644
index 0000000..9720b08
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not thread-safe.
+ */
+public class SimpleCounter implements Counter {
+	private long count;
+
+	/**
+	 * Increment the current count by 1.
+	 */
+	@Override
+	public void inc() {
+		count++;
+	}
+
+	/**
+	 * Increment the current count by the given value.
+	 *
+	 * @param n value to increment the current count by
+	 */
+	@Override
+	public void inc(long n) {
+		count += n;
+	}
+
+	/**
+	 * Decrement the current count by 1.
+	 */
+	@Override
+	public void dec() {
+		count--;
+	}
+
+	/**
+	 * Decrement the current count by the given value.
+	 *
+	 * @param n value to decrement the current count by
+	 */
+	@Override
+	public void dec(long n) {
+		count -= n;
+	}
+
+	/**
+	 * Returns the current count.
+	 *
+	 * @return current count
+	 */
+	@Override
+	public long getCount() {
+		return count;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
index 032fa04..93eb734 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.SimpleCounter;
 
 import org.apache.flink.metrics.groups.scope.ScopeFormat;
 import org.slf4j.Logger;
@@ -146,18 +147,27 @@ public abstract class AbstractMetricGroup implements MetricGroup {
 
 	@Override
 	public Counter counter(String name) {
-		Counter counter = new Counter();
+		return counter(name, new SimpleCounter());
+	}
+	
+	@Override
+	public <C extends Counter> C counter(int name, C counter) {
+		return counter(String.valueOf(name), counter);
+	}
+
+	@Override
+	public <C extends Counter> C counter(String name, C counter) {
 		addMetric(name, counter);
 		return counter;
 	}
 
 	@Override
-	public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+	public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
 		return gauge(String.valueOf(name), gauge);
 	}
 
 	@Override
-	public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+	public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
 		addMetric(name, gauge);
 		return gauge;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
index 961bcce..29d71d9 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
 
 /**
  * A special {@link MetricGroup} that does not register any metrics at the metrics registry
@@ -42,21 +43,31 @@ public class UnregisteredMetricsGroup implements MetricGroup {
 
 	@Override
 	public Counter counter(int name) {
-		return new Counter();
+		return new SimpleCounter();
 	}
 
 	@Override
 	public Counter counter(String name) {
-		return new Counter();
+		return new SimpleCounter();
 	}
 
 	@Override
-	public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+	public <C extends Counter> C counter(int name, C counter) {
+		return counter;
+	}
+
+	@Override
+	public <C extends Counter> C counter(String name, C counter) {
+		return counter;
+	}
+
+	@Override
+	public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
 		return gauge;
 	}
 
 	@Override
-	public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+	public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
 		return gauge;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8ef22af..1fb0e09 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2294,11 +2294,10 @@ object TaskManager {
   private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
     val mxBean = ManagementFactory.getClassLoadingMXBean
 
-    metrics
-      .gauge("ClassesLoaded", new FlinkGauge[Long] {
+    metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] {
       override def getValue: Long = mxBean.getTotalLoadedClassCount
     })
-    metrics.gauge("ClassesUnloaded", new FlinkGauge[Long] {
+    metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] {
       override def getValue: Long = mxBean.getUnloadedClassCount
     })
   }
@@ -2308,10 +2307,10 @@ object TaskManager {
 
     for (garbageCollector <- garbageCollectors) {
       val gcGroup = metrics.addGroup("\"" + garbageCollector.getName + "\"")
-      gcGroup.gauge("Count", new FlinkGauge[Long] {
+      gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
         override def getValue: Long = garbageCollector.getCollectionCount
       })
-      gcGroup.gauge("Time", new FlinkGauge[Long] {
+      gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
         override def getValue: Long = garbageCollector.getCollectionTime
       })
     }
@@ -2320,24 +2319,24 @@ object TaskManager {
   private def instantiateMemoryMetrics(metrics: MetricGroup) {
     val mxBean = ManagementFactory.getMemoryMXBean
     val heap = metrics.addGroup("Heap")
-    heap.gauge("Used", new FlinkGauge[Long] {
+    heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
       override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
     })
-    heap.gauge("Committed", new FlinkGauge[Long] {
+    heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
       })
-    heap.gauge("Max", new FlinkGauge[Long] {
+    heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
       })
 
     val nonHeap = metrics.addGroup("NonHeap")
-    nonHeap.gauge("Used", new FlinkGauge[Long] {
+    nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
       })
-    nonHeap.gauge("Committed", new FlinkGauge[Long] {
+    nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
       })
-    nonHeap.gauge("Max", new FlinkGauge[Long] {
+    nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
       })
 
@@ -2346,15 +2345,15 @@ object TaskManager {
     val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
 
     val direct = metrics.addGroup("Direct")
-    direct.gauge("Count", new FlinkGauge[Long] {
+    direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(directObjectName, "Count").asInstanceOf[Long]
       })
-    direct.gauge("MemoryUsed", new FlinkGauge[Long] {
+    direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
       })
-    direct.gauge("TotalCapacity", new FlinkGauge[Long] {
+    direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
       })
@@ -2362,15 +2361,15 @@ object TaskManager {
     val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
 
     val mapped = metrics.addGroup("Mapped")
-    mapped.gauge("Count", new FlinkGauge[Long] {
+    mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
       })
-    mapped.gauge("MemoryUsed", new FlinkGauge[Long] {
+    mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
       })
-    mapped.gauge("TotalCapacity", new FlinkGauge[Long] {
+    mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
       })
@@ -2379,8 +2378,7 @@ object TaskManager {
   private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
     val mxBean = ManagementFactory.getThreadMXBean
 
-    metrics
-      .gauge("Count", new FlinkGauge[Int] {
+    metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
       override def getValue: Int = mxBean.getThreadCount
     })
   }
@@ -2390,11 +2388,10 @@ object TaskManager {
       val mxBean = ManagementFactory.getOperatingSystemMXBean
         .asInstanceOf[com.sun.management.OperatingSystemMXBean]
 
-      metrics
-        .gauge("Load", new FlinkGauge[Double] {
+      metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] {
           override def getValue: Double = mxBean.getProcessCpuLoad
         })
-      metrics.gauge("Time", new FlinkGauge[Long] {
+      metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
           override def getValue: Long = mxBean.getProcessCpuTime
         })
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index da15f08..0868398 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -119,7 +119,7 @@ public class InputChannelTest {
 				ResultPartitionID partitionId,
 				Tuple2<Integer, Integer> initialAndMaxBackoff) {
 
-			super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new Counter());
+			super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new SimpleCounter());
 		}
 
 		@Override


[2/3] flink git commit: [FLINK-3951] Add Histogram metric type

Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
new file mode 100644
index 0000000..2479c26
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_INTERVAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
+
+	/**
+	 * Tests the histogram functionality of the DropwizardHistogramWrapper.
+	 */
+	@Test
+	public void testDropwizardHistogramWrapper() {
+		int size = 10;
+		DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
+			new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
+
+		for (int i = 0; i < size; i++) {
+			histogramWrapper.update(i);
+
+			assertEquals(i + 1, histogramWrapper.getCount());
+			assertEquals(i, histogramWrapper.getStatistics().getMax());
+			assertEquals(0, histogramWrapper.getStatistics().getMin());
+		}
+
+		assertEquals(size, histogramWrapper.getStatistics().size());
+		assertEquals((size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+
+		for (int i = size; i < 2 * size; i++) {
+			histogramWrapper.update(i);
+
+			assertEquals(i + 1, histogramWrapper.getCount());
+			assertEquals(i, histogramWrapper.getStatistics().getMax());
+			assertEquals(i + 1 - size, histogramWrapper.getStatistics().getMin());
+		}
+
+		assertEquals(size, histogramWrapper.getStatistics().size());
+		assertEquals(size + (size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+	}
+
+	/**
+	 * Tests that the DropwizardHistogramWrapper reports correct dropwizard snapshots to the
+	 * ScheduledReporter.
+	 */
+	@Test
+	public void testDropwizardHistogramWrapperReporting() throws Exception {
+		long reportingInterval = 1000;
+		long timeout = 30000;
+		int size = 10;
+		String histogramMetricName = "histogram";
+		Configuration config = new Configuration();
+		config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestingReporter.class.getName());
+		config.setString(KEY_METRICS_REPORTER_INTERVAL, reportingInterval + " MILLISECONDS");
+
+		MetricRegistry registry = null;
+
+		try {
+			registry = new MetricRegistry(config);
+			DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
+				new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
+
+			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+			metricGroup.histogram(histogramMetricName, histogramWrapper);
+
+			String fullMetricName = metricGroup.getScopeString() + "." + histogramMetricName;
+
+			Field f = registry.getClass().getDeclaredField("reporter");
+			f.setAccessible(true);
+
+			MetricReporter reporter = (MetricReporter) f.get(registry);
+
+			assertTrue(reporter instanceof TestingReporter);
+
+			TestingReporter testingReporter = (TestingReporter) reporter;
+
+			TestingScheduledReporter scheduledReporter = testingReporter.scheduledReporter;
+
+			// check that the metric has been registered
+			assertEquals(1, testingReporter.getMetrics().size());
+
+			for (int i = 0; i < size; i++) {
+				histogramWrapper.update(i);
+			}
+
+			Future<Snapshot> snapshotFuture = scheduledReporter.getNextHistogramSnapshot(fullMetricName);
+
+			Snapshot snapshot = snapshotFuture.get(timeout, TimeUnit.MILLISECONDS);
+
+			assertEquals(0, snapshot.getMin());
+			assertEquals((size - 1) / 2.0, snapshot.getMedian(), 0.001);
+			assertEquals(size - 1, snapshot.getMax());
+			assertEquals(size, snapshot.size());
+
+			registry.unregister(histogramWrapper, "histogram", metricGroup);
+
+			// check that the metric has been de-registered
+			assertEquals(0, testingReporter.getMetrics().size());
+		} finally {
+			if (registry != null) {
+				registry.shutdown();
+			}
+		}
+	}
+
+	public static class TestingReporter extends ScheduledDropwizardReporter {
+		TestingScheduledReporter scheduledReporter = null;
+
+		@Override
+		public ScheduledReporter getReporter(Configuration config) {
+			scheduledReporter = new TestingScheduledReporter(
+				registry,
+				getClass().getName(),
+				null,
+				TimeUnit.MILLISECONDS,
+				TimeUnit.MILLISECONDS);
+
+			return scheduledReporter;
+		}
+
+		public Map<String, com.codahale.metrics.Metric> getMetrics() {
+			return registry.getMetrics();
+		}
+	}
+
+	static class TestingScheduledReporter extends ScheduledReporter {
+
+		final Map<String, Snapshot> histogramSnapshots = new HashMap<>();
+		final Map<String, List<CompletableFuture<Snapshot>>> histogramSnapshotFutures = new HashMap<>();
+
+		protected TestingScheduledReporter(com.codahale.metrics.MetricRegistry registry, String name, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit) {
+			super(registry, name, filter, rateUnit, durationUnit);
+		}
+
+		@Override
+		public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, com.codahale.metrics.Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
+			for (Map.Entry<String, com.codahale.metrics.Histogram> entry: histograms.entrySet()) {
+				reportHistogram(entry.getKey(), entry.getValue());
+			}
+		}
+
+		void reportHistogram(String name, com.codahale.metrics.Histogram histogram) {
+			histogramSnapshots.put(name, histogram.getSnapshot());
+
+			synchronized (histogramSnapshotFutures) {
+				if (histogramSnapshotFutures.containsKey(name)) {
+					List<CompletableFuture<Snapshot>> futures = histogramSnapshotFutures.remove(name);
+
+					for (CompletableFuture<Snapshot> future: futures) {
+						future.complete(histogram.getSnapshot());
+					}
+				}
+			}
+		}
+
+		Future<Snapshot> getNextHistogramSnapshot(String name) {
+			synchronized (histogramSnapshotFutures) {
+				List<CompletableFuture<Snapshot>> futures;
+				if (histogramSnapshotFutures.containsKey(name)) {
+					futures = histogramSnapshotFutures.get(name);
+				} else {
+					futures = new ArrayList<>();
+					histogramSnapshotFutures.put(name, futures);
+				}
+
+				CompletableFuture<Snapshot> future = new CompletableFuture<>();
+				futures.add(future);
+
+				return future;
+			}
+		}
+	}
+
+	static class CompletableFuture<T> implements Future<T> {
+
+		private Exception exception = null;
+		private T value = null;
+
+		private Object lock = new Object();
+
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			synchronized (lock) {
+				if (isDone()) {
+					return false;
+				} else {
+					exception = new CancellationException("Future was cancelled.");
+
+					lock.notifyAll();
+
+					return true;
+				}
+			}
+		}
+
+		@Override
+		public boolean isCancelled() {
+			return exception instanceof CancellationException;
+		}
+
+		@Override
+		public boolean isDone() {
+			return value != null || exception != null;
+		}
+
+		@Override
+		public T get() throws InterruptedException, ExecutionException {
+			while (!isDone() && !isCancelled()) {
+				synchronized (lock) {
+					lock.wait();
+				}
+			}
+
+			if (exception != null) {
+				throw new ExecutionException(exception);
+			} else if (value != null) {
+				return value;
+			} else {
+				throw new ExecutionException(new Exception("Future did not complete correctly."));
+			}
+		}
+
+		@Override
+		public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+			long timeoutMs = unit.toMillis(timeout);
+			long timeoutEnd = timeoutMs + System.currentTimeMillis();
+
+			while (!isDone() && !isCancelled() && timeoutMs > 0) {
+				synchronized (lock) {
+					lock.wait(unit.toMillis(timeoutMs));
+				}
+
+				timeoutMs = timeoutEnd - System.currentTimeMillis();
+			}
+
+			if (exception != null) {
+				throw new ExecutionException(exception);
+			} else if (value != null) {
+				return value;
+			} else {
+				throw new ExecutionException(new Exception("Future did not complete correctly."));
+			}
+		}
+
+		public boolean complete(T value) {
+			synchronized (lock) {
+				if (!isDone()) {
+					this.value = value;
+
+					lock.notifyAll();
+
+					return true;
+				} else {
+					return false;
+				}
+			}
+		}
+
+		public boolean fail(Exception exception) {
+			synchronized (lock) {
+				if (!isDone()) {
+					this.exception = exception;
+
+					lock.notifyAll();
+
+					return true;
+				} else {
+					return false;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    
+    <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+    <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-ganglia/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/pom.xml b/flink-metrics/flink-metrics-ganglia/pom.xml
new file mode 100644
index 0000000..c4f51da
--- /dev/null
+++ b/flink-metrics/flink-metrics-ganglia/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-ganglia</artifactId>
+	<name>flink-metrics-ganglia</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-dropwizard</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>info.ganglia.gmetric4j</groupId>
+			<artifactId>gmetric4j</artifactId>
+			<version>1.0.7</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-ganglia</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
new file mode 100644
index 0000000..adf9394
--- /dev/null
+++ b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.ganglia;
+
+import com.codahale.metrics.ScheduledReporter;
+
+import info.ganglia.gmetric4j.gmetric.GMetric;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+@PublicEvolving
+public class GangliaReporter extends ScheduledDropwizardReporter {
+	
+	public static final String ARG_DMAX = "dmax";
+	public static final String ARG_TMAX = "tmax";
+	public static final String ARG_TTL = "ttl";
+	public static final String ARG_MODE_ADDRESSING = "addressingMode";
+
+	@Override
+	public ScheduledReporter getReporter(Configuration config) {
+
+		try {
+			String host = config.getString(ARG_HOST, null);
+			int port = config.getInteger(ARG_PORT, -1);
+			if (host == null || host.length() == 0 || port < 1) {
+				throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+			}
+			String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
+			int ttl = config.getInteger(ARG_TTL, -1);
+			GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
+
+			String prefix = config.getString(ARG_PREFIX, null);
+			String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
+			String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
+			int dMax = config.getInteger(ARG_DMAX, 0);
+			int tMax = config.getInteger(ARG_TMAX, 60);
+
+			com.codahale.metrics.ganglia.GangliaReporter.Builder builder =
+				com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
+
+			if (prefix != null) {
+				builder.prefixedWith(prefix);
+			}
+			if (conversionRate != null) {
+				builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+			}
+			if (conversionDuration != null) {
+				builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+			}
+			builder.withDMax(dMax);
+			builder.withTMax(tMax);
+
+			return builder.build(gMetric);
+		} catch (IOException e) {
+			throw new RuntimeException("Error while instantiating GangliaReporter.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-graphite/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/pom.xml b/flink-metrics/flink-metrics-graphite/pom.xml
new file mode 100644
index 0000000..45fb018
--- /dev/null
+++ b/flink-metrics/flink-metrics-graphite/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-graphite</artifactId>
+	<name>flink-metrics-graphite</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-dropwizard</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-graphite</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
new file mode 100644
index 0000000..16be830
--- /dev/null
+++ b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.graphite;
+
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.graphite.Graphite;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.util.concurrent.TimeUnit;
+
+@PublicEvolving
+public class GraphiteReporter extends ScheduledDropwizardReporter {
+
+	@Override
+	public ScheduledReporter getReporter(Configuration config) {
+		String host = config.getString(ARG_HOST, null);
+		int port = config.getInteger(ARG_PORT, -1);
+
+		if (host == null || host.length() == 0 || port < 1) {
+			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+		}
+
+		String prefix = config.getString(ARG_PREFIX, null);
+		String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
+		String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
+
+		com.codahale.metrics.graphite.GraphiteReporter.Builder builder =
+			com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry);
+
+		if (prefix != null) {
+			builder.prefixedWith(prefix);
+		}
+
+		if (conversionRate != null) {
+			builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+		}
+
+		if (conversionDuration != null) {
+			builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+		}
+
+		return builder.build(new Graphite(host, port));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/pom.xml b/flink-metrics/flink-metrics-statsd/pom.xml
new file mode 100644
index 0000000..8ee0b56
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-statsd</artifactId>
+	<name>flink-metrics-statsd</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
new file mode 100644
index 0000000..3d9bf07
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.statsd;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.util.ConcurrentModificationException;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Largely based on the StatsDReporter class by ReadyTalk
+ * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
+ *
+ * Ported since it was not present in maven central.
+ */
+@PublicEvolving
+public class StatsDReporter extends AbstractReporter implements Scheduled {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class);
+
+	public static final String ARG_HOST = "host";
+	public static final String ARG_PORT = "port";
+//	public static final String ARG_CONVERSION_RATE = "rateConversion";
+//	public static final String ARG_CONVERSION_DURATION = "durationConversion";
+
+	private boolean closed = false;
+
+	private DatagramSocket socket;
+	private InetSocketAddress address;
+
+	@Override
+	public void open(Configuration config) {
+		String host = config.getString(ARG_HOST, null);
+		int port = config.getInteger(ARG_PORT, -1);
+
+		if (host == null || host.length() == 0 || port < 1) {
+			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+		}
+
+		this.address = new InetSocketAddress(host, port);
+
+//		String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS");
+//		String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
+//		this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
+//		this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1);
+
+		try {
+			this.socket = new DatagramSocket(0);
+		} catch (SocketException e) {
+			throw new RuntimeException("Could not create datagram socket. ", e);
+		}
+	}
+
+	@Override
+	public void close() {
+		closed = true;
+		if (socket != null && !socket.isClosed()) {
+			socket.close();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void report() {
+		// instead of locking here, we tolerate exceptions
+		// we do this to prevent holding the lock for very long and blocking
+		// operator creation and shutdown
+		try {
+			for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
+				if (closed) {
+					return;
+				}
+				reportGauge(entry.getValue(), entry.getKey());
+			}
+
+			for (Map.Entry<Counter, String> entry : counters.entrySet()) {
+				if (closed) {
+					return;
+				}
+				reportCounter(entry.getValue(), entry.getKey());
+			}
+
+			for (Map.Entry<Histogram, String> entry : histograms.entrySet()) {
+				reportHistogram(entry.getValue(), entry.getKey());
+			}
+		}
+		catch (ConcurrentModificationException | NoSuchElementException e) {
+			// ignore - may happen when metrics are concurrently added or removed
+			// report next time
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	
+	private void reportCounter(final String name, final Counter counter) {
+		send(name, String.valueOf(counter.getCount()));
+	}
+
+	private void reportGauge(final String name, final Gauge<?> gauge) {
+		Object value = gauge.getValue();
+		if (value != null) {
+			send(name, value.toString());
+		}
+	}
+
+	private void reportHistogram(final String name, final Histogram histogram) {
+		if (histogram != null) {
+
+			HistogramStatistics statistics = histogram.getStatistics();
+
+			if (statistics != null) {
+				send(prefix(name, "count"), String.valueOf(histogram.getCount()));
+				send(prefix(name, "max"), String.valueOf(statistics.getMax()));
+				send(prefix(name, "min"), String.valueOf(statistics.getMin()));
+				send(prefix(name, "mean"), String.valueOf(statistics.getMean()));
+				send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev()));
+				send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5)));
+				send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75)));
+				send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95)));
+				send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98)));
+				send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99)));
+				send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999)));
+			}
+		}
+	}
+
+	private String prefix(String ... names) {
+		if (names.length > 0) {
+			StringBuilder stringBuilder = new StringBuilder(names[0]);
+
+			for (int i = 1; i < names.length; i++) {
+				stringBuilder.append('.').append(names[i]);
+			}
+
+			return stringBuilder.toString();
+		} else {
+			return "";
+		}
+	}
+
+	private void send(final String name, final String value) {
+		try {
+			String formatted = String.format("%s:%s|g", name, value);
+			byte[] data = formatted.getBytes();
+			socket.send(new DatagramPacket(data, data.length, this.address));
+		}
+		catch (IOException e) {
+			LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
new file mode 100644
index 0000000..5f5ef40
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.statsd;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+
+public class StatsDReporterTest extends TestLogger {
+
+	/**
+	 * Tests that histograms are properly reported via the StatsD reporter
+	 */
+	@Test
+	public void testStatsDHistogramReporting() throws Exception {
+		MetricRegistry registry = null;
+		DatagramSocketReceiver receiver = null;
+		Thread receiverThread = null;
+		long timeout = 5000;
+		long joinTimeout = 30000;
+
+		String histogramName = "histogram";
+
+		try {
+			receiver = new DatagramSocketReceiver();
+
+			receiverThread = new Thread(receiver);
+
+			receiverThread.start();
+
+			int port = receiver.getPort();
+
+			Configuration config = new Configuration();
+			config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, StatsDReporter.class.getName());
+			config.setString(MetricRegistry.KEY_METRICS_REPORTER_INTERVAL, "1 SECONDS");
+			config.setString(MetricRegistry.KEY_METRICS_REPORTER_ARGUMENTS, "--host localhost --port " + port);
+
+			registry = new MetricRegistry(config);
+
+			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+			TestingHistogram histogram = new TestingHistogram();
+
+			metricGroup.histogram(histogramName, histogram);
+
+			receiver.waitUntilNumLines(11, timeout);
+
+			Set<String> lines = receiver.getLines();
+
+			String prefix = metricGroup.getScopeString() + "." + histogramName;
+
+			Set<String> expectedLines = new HashSet<>();
+
+			expectedLines.add(prefix + ".count:1|g");
+			expectedLines.add(prefix + ".mean:3.0|g");
+			expectedLines.add(prefix + ".min:6|g");
+			expectedLines.add(prefix + ".max:5|g");
+			expectedLines.add(prefix + ".stddev:4.0|g");
+			expectedLines.add(prefix + ".p75:0.75|g");
+			expectedLines.add(prefix + ".p98:0.98|g");
+			expectedLines.add(prefix + ".p99:0.99|g");
+			expectedLines.add(prefix + ".p999:0.999|g");
+			expectedLines.add(prefix + ".p95:0.95|g");
+			expectedLines.add(prefix + ".p50:0.5|g");
+
+			assertEquals(expectedLines, lines);
+
+		} finally {
+			if (registry != null) {
+				registry.shutdown();
+			}
+
+			if (receiver != null) {
+				receiver.stop();
+			}
+
+			if (receiverThread != null) {
+				receiverThread.join(joinTimeout);
+			}
+		}
+	}
+
+	public static class TestingHistogram implements Histogram {
+
+		@Override
+		public void update(long value) {
+
+		}
+
+		@Override
+		public long getCount() {
+			return 1;
+		}
+
+		@Override
+		public HistogramStatistics getStatistics() {
+			return new HistogramStatistics() {
+				@Override
+				public double getQuantile(double quantile) {
+					return quantile;
+				}
+
+				@Override
+				public long[] getValues() {
+					return new long[0];
+				}
+
+				@Override
+				public int size() {
+					return 2;
+				}
+
+				@Override
+				public double getMean() {
+					return 3;
+				}
+
+				@Override
+				public double getStdDev() {
+					return 4;
+				}
+
+				@Override
+				public long getMax() {
+					return 5;
+				}
+
+				@Override
+				public long getMin() {
+					return 6;
+				}
+			};
+		}
+	}
+
+	public static class DatagramSocketReceiver implements Runnable {
+		private static final Object obj = new Object();
+
+		private final DatagramSocket socket;
+		private final ConcurrentHashMap<String, Object> lines;
+
+		private boolean running = true;
+
+		public DatagramSocketReceiver() throws SocketException {
+			socket = new DatagramSocket();
+			lines = new ConcurrentHashMap<>();
+		}
+
+		public int getPort() {
+			return socket.getLocalPort();
+		}
+
+		public void stop() {
+			running = false;
+			socket.close();
+		}
+
+		public void waitUntilNumLines(int numberLines, long timeout) throws TimeoutException {
+			long endTimeout = System.currentTimeMillis() + timeout;
+			long remainingTimeout = timeout;
+
+			while (numberLines > lines.size() && remainingTimeout > 0) {
+				synchronized (lines) {
+					try {
+						lines.wait(remainingTimeout);
+					} catch (InterruptedException e) {
+						// ignore interruption exceptions
+					}
+				}
+
+				remainingTimeout = endTimeout - System.currentTimeMillis();
+			}
+
+			if (remainingTimeout <= 0) {
+				throw new TimeoutException("Have not received " + numberLines + " in time.");
+			}
+		}
+
+		public Set<String> getLines() {
+			return lines.keySet();
+		}
+
+		@Override
+		public void run() {
+			while (running) {
+				try {
+					byte[] buffer = new byte[1024];
+
+					DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+					socket.receive(packet);
+
+					String line = new String(packet.getData(), 0, packet.getLength());
+
+					lines.put(line, obj);
+
+					synchronized (lines) {
+						lines.notifyAll();
+					}
+				} catch (IOException ex) {
+					// ignore the exceptions
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    
+    <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+    <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
new file mode 100644
index 0000000..542f49c
--- /dev/null
+++ b/flink-metrics/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics</artifactId>
+	<name>flink-metrics</name>
+	<packaging>pom</packaging>
+
+	<modules>
+		<module>flink-metrics-dropwizard</module>
+		<module>flink-metrics-ganglia</module>
+		<module>flink-metrics-graphite</module>
+		<module>flink-metrics-statsd</module>
+	</modules>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9dc8846..9da3fa5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,7 +74,7 @@ under the License.
 		<module>flink-quickstart</module>
 		<module>flink-contrib</module>
 		<module>flink-dist</module>
-		<module>flink-metric-reporters</module>
+		<module>flink-metrics</module>
 	</modules>
 
 	<properties>


[3/3] flink git commit: [FLINK-3951] Add Histogram metric type

Posted by ch...@apache.org.
[FLINK-3951] Add Histogram metric type

This closes #2112


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee3c7a88
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee3c7a88
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee3c7a88

Branch: refs/heads/master
Commit: ee3c7a88bb74232e4884899699aaa08ae2b6e038
Parents: d43bf8d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 14 19:04:43 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 15:32:03 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/metrics/Histogram.java     |  52 +++
 .../flink/metrics/HistogramStatistics.java      |  81 +++++
 .../org/apache/flink/metrics/MetricGroup.java   |  20 ++
 .../metrics/groups/AbstractMetricGroup.java     |  12 +
 .../groups/UnregisteredMetricsGroup.java        |  12 +-
 .../metrics/reporter/AbstractReporter.java      |  15 +
 .../flink/metrics/reporter/JMXReporter.java     |  98 +++++-
 .../flink/metrics/MetricRegistryTest.java       |   3 +-
 .../groups/MetricGroupRegistrationTest.java     |  21 ++
 .../flink/metrics/reporter/JMXReporterTest.java | 108 ++++++-
 .../flink-metrics-dropwizard/pom.xml            |  72 -----
 .../dropwizard/ScheduledDropwizardReporter.java | 140 --------
 .../dropwizard/metrics/CounterWrapper.java      |  33 --
 .../flink/dropwizard/metrics/GaugeWrapper.java  |  41 ---
 .../flink-metrics-ganglia/pom.xml               |  90 ------
 .../flink/metrics/ganglia/GangliaReporter.java  |  79 -----
 .../flink-metrics-graphite/pom.xml              |  84 -----
 .../metrics/graphite/GraphiteReporter.java      |  63 ----
 .../flink-metrics-statsd/pom.xml                |  43 ---
 .../flink/metrics/statsd/StatsDReporter.java    | 143 ---------
 flink-metric-reporters/pom.xml                  |  42 ---
 flink-metrics/flink-metrics-dropwizard/pom.xml  |  80 +++++
 .../dropwizard/ScheduledDropwizardReporter.java | 162 ++++++++++
 .../metrics/DropwizardHistogramStatistics.java  |  70 ++++
 .../metrics/DropwizardHistogramWrapper.java     |  53 +++
 .../dropwizard/metrics/FlinkCounterWrapper.java |  33 ++
 .../dropwizard/metrics/FlinkGaugeWrapper.java   |  41 +++
 .../metrics/FlinkHistogramWrapper.java          |  52 +++
 .../metrics/HistogramStatisticsWrapper.java     |  86 +++++
 .../DropwizardFlinkHistogramWrapperTest.java    | 319 +++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/logback-test.xml         |  34 ++
 flink-metrics/flink-metrics-ganglia/pom.xml     |  90 ++++++
 .../flink/metrics/ganglia/GangliaReporter.java  |  79 +++++
 flink-metrics/flink-metrics-graphite/pom.xml    |  84 +++++
 .../metrics/graphite/GraphiteReporter.java      |  63 ++++
 flink-metrics/flink-metrics-statsd/pom.xml      |  51 +++
 .../flink/metrics/statsd/StatsDReporter.java    | 184 +++++++++++
 .../metrics/statsd/StatsDReporterTest.java      | 236 ++++++++++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/logback-test.xml         |  34 ++
 flink-metrics/pom.xml                           |  42 +++
 pom.xml                                         |   2 +-
 43 files changed, 2264 insertions(+), 837 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
new file mode 100644
index 0000000..3fd1253
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Histogram interface to be used with Flink's metrics system.
+ *
+ * The histogram allows to record values, get the current count of recorded values and create
+ * histogram statistics for the currently seen elements.
+ */
+@PublicEvolving
+public interface Histogram extends Metric {
+
+	/**
+	 * Update the histogram with the given value.
+	 *
+	 * @param value Value to update the histogram with
+	 */
+	void update(long value);
+
+	/**
+	 * Get the count of seen elements.
+	 *
+	 * @return Count of seen elements
+	 */
+	long getCount();
+
+	/**
+	 * Create statistics for the currently recorded elements.
+	 *
+	 * @return Statistics about the currently recorded elements
+	 */
+	HistogramStatistics getStatistics();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
new file mode 100644
index 0000000..476580c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Histogram statistics represent the current snapshot of elements recorded in the histogram.
+ *
+ * The histogram statistics allow to calculate values for quantiles, the mean, the standard
+ * deviation, the minimum and the maximum.
+ */
+@PublicEvolving
+public abstract class HistogramStatistics {
+
+	/**
+	 * Returns the value for the given quantile based on the represented histogram statistics.
+	 *
+	 * @param quantile Quantile to calculate the value for
+	 * @return Value for the given quantile
+	 */
+	public abstract double getQuantile(double quantile);
+
+	/**
+	 * Returns the elements of the statistics' sample
+	 *
+	 * @return Elements of the statistics' sample
+	 */
+	public abstract long[] getValues();
+
+	/**
+	 * Returns the size of the statistics' sample
+	 *
+	 * @return Size of the statistics' sample
+	 */
+	public abstract int size();
+
+	/**
+	 * Returns the mean of the histogram values.
+	 *
+	 * @return Mean of the histogram values
+	 */
+	public abstract double getMean();
+
+	/**
+	 * Returns the standard deviation of the distribution reflected by the histogram statistics.
+	 *
+	 * @return Standard deviation of histogram distribution
+	 */
+	public abstract double getStdDev();
+
+	/**
+	 * Returns the maximum value of the histogram.
+	 *
+	 * @return Maximum value of the histogram
+	 */
+	public abstract long getMax();
+
+	/**
+	 * Returns the minimum value of the histogram.
+	 *
+	 * @return Minimum value of the histogram
+	 */
+	public abstract long getMin();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index b131949..f46d3fc 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -115,6 +115,26 @@ public interface MetricGroup {
 	 */
 	<T, G extends Gauge<T>> G gauge(String name, G gauge);
 
+	/**
+	 * Registers a new {@link Histogram} with Flink.
+	 *
+	 * @param name name of the histogram
+	 * @param histogram histogram to register
+	 * @param <H> histogram type   
+	 * @return the registered histogram
+	 */
+	<H extends Histogram> H histogram(String name, H histogram);
+
+	/**
+	 * Registers a new {@link Histogram} with Flink.
+	 *
+	 * @param name name of the histogram
+	 * @param histogram histogram to register
+	 * @param <H> histogram type   
+	 * @return the registered histogram
+	 */
+	<H extends Histogram> H histogram(int name, H histogram);
+
 	// ------------------------------------------------------------------------
 	// Groups
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
index 93eb734..112957e 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
@@ -172,6 +173,17 @@ public abstract class AbstractMetricGroup implements MetricGroup {
 		return gauge;
 	}
 
+	@Override
+	public <H extends Histogram> H histogram(int name, H histogram) {
+		return histogram(String.valueOf(name), histogram);
+	}
+
+	@Override
+	public <H extends Histogram> H histogram(String name, H histogram) {
+		addMetric(name, histogram);
+		return histogram;
+	}
+
 	/**
 	 * Adds the given metric to the group and registers it at the registry, if the group
 	 * is not yet closed, and if no metric with the same name has been registered before.

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
index 29d71d9..8e183df 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 
@@ -71,7 +72,16 @@ public class UnregisteredMetricsGroup implements MetricGroup {
 		return gauge;
 	}
 
-	
+	@Override
+	public <H extends Histogram> H histogram(int name, H histogram) {
+		return histogram;
+	}
+
+	@Override
+	public <H extends Histogram> H histogram(String name, H histogram) {
+		return histogram;
+	}
+
 	@Override
 	public MetricGroup addGroup(int name) {
 		return addGroup(String.valueOf(name));

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
index f2e78bf..8dacb7c 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
@@ -21,8 +21,11 @@ package org.apache.flink.metrics.reporter;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -32,9 +35,11 @@ import java.util.Map;
  */
 @PublicEvolving
 public abstract class AbstractReporter implements MetricReporter {
+	protected final Logger log = LoggerFactory.getLogger(getClass());
 
 	protected final Map<Gauge<?>, String> gauges = new HashMap<>();
 	protected final Map<Counter, String> counters = new HashMap<>();
+	protected final Map<Histogram, String> histograms = new HashMap<>();
 
 	@Override
 	public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
@@ -45,6 +50,11 @@ public abstract class AbstractReporter implements MetricReporter {
 				counters.put((Counter) metric, name);
 			} else if (metric instanceof Gauge) {
 				gauges.put((Gauge<?>) metric, name);
+			} else if (metric instanceof Histogram) {
+				histograms.put((Histogram) metric, name);
+			} else {
+				log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
+					"does not support this metric type.", metric.getClass().getName());
 			}
 		}
 	}
@@ -56,6 +66,11 @@ public abstract class AbstractReporter implements MetricReporter {
 				counters.remove(metric);
 			} else if (metric instanceof Gauge) {
 				gauges.remove(metric);
+			} else if (metric instanceof Histogram) {
+				histograms.remove(metric);
+			} else {
+				log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
+					"does not support this metric type.", metric.getClass().getName());
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
index 326d6d7..eaf0ea0 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.util.NetUtils;
@@ -146,8 +147,11 @@ public class JMXReporter implements MetricReporter {
 			jmxMetric = new JmxGauge((Gauge<?>) metric);
 		} else if (metric instanceof Counter) {
 			jmxMetric = new JmxCounter((Counter) metric);
+		} else if (metric instanceof Histogram) {
+			jmxMetric = new JmxHistogram((Histogram) metric);
 		} else {
-			LOG.error("Unknown metric type: " + metric.getClass().getName());
+			LOG.error("Cannot add unknown metric type: {}. This indicates that the metric type " +
+				"is not supported by this reporter.", metric.getClass().getName());
 			return;
 		}
 
@@ -285,7 +289,7 @@ public class JMXReporter implements MetricReporter {
 	private static class JmxCounter extends AbstractBean implements JmxCounterMBean {
 		private Counter counter;
 
-		public JmxCounter(Counter counter) {
+		JmxCounter(Counter counter) {
 			this.counter = counter;
 		}
 
@@ -303,7 +307,7 @@ public class JMXReporter implements MetricReporter {
 
 		private final Gauge<?> gauge;
 
-		public JmxGauge(Gauge<?> gauge) {
+		JmxGauge(Gauge<?> gauge) {
 			this.gauge = gauge;
 		}
 
@@ -313,6 +317,94 @@ public class JMXReporter implements MetricReporter {
 		}
 	}
 
+	public interface JmxHistogramMBean extends MetricMBean {
+		long getCount();
+
+		double getMean();
+
+		double getStdDev();
+
+		long getMax();
+
+		long getMin();
+
+		double getMedian();
+
+		double get75thPercentile();
+
+		double get95thPercentile();
+
+		double get98thPercentile();
+
+		double get99thPercentile();
+
+		double get999thPercentile();
+	}
+
+	private static class JmxHistogram extends AbstractBean implements JmxHistogramMBean {
+
+		private final Histogram histogram;
+
+		JmxHistogram(Histogram histogram) {
+			this.histogram = histogram;
+		}
+
+		@Override
+		public long getCount() {
+			return histogram.getCount();
+		}
+
+		@Override
+		public double getMean() {
+			return histogram.getStatistics().getMean();
+		}
+
+		@Override
+		public double getStdDev() {
+			return histogram.getStatistics().getStdDev();
+		}
+
+		@Override
+		public long getMax() {
+			return histogram.getStatistics().getMax();
+		}
+
+		@Override
+		public long getMin() {
+			return histogram.getStatistics().getMin();
+		}
+
+		@Override
+		public double getMedian() {
+			return histogram.getStatistics().getQuantile(0.5);
+		}
+
+		@Override
+		public double get75thPercentile() {
+			return histogram.getStatistics().getQuantile(0.75);
+		}
+
+		@Override
+		public double get95thPercentile() {
+			return histogram.getStatistics().getQuantile(0.95);
+		}
+
+		@Override
+		public double get98thPercentile() {
+			return histogram.getStatistics().getQuantile(0.98);
+		}
+
+		@Override
+		public double get99thPercentile() {
+			return histogram.getStatistics().getQuantile(0.99);
+		}
+
+		@Override
+		public double get999thPercentile() {
+			return histogram.getStatistics().getQuantile(0.999);
+		}
+	}
+
 	/**
 	 * JMX Server implementation that JMX clients can connect to.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
index f8e0bf5..8b71816 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
@@ -25,12 +25,13 @@ import org.apache.flink.metrics.groups.scope.ScopeFormats;
 import org.apache.flink.metrics.reporter.Scheduled;
 import org.apache.flink.metrics.util.TestReporter;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
-public class MetricRegistryTest {
+public class MetricRegistryTest extends TestLogger {
 	
 	/**
 	 * Verifies that the reporter class argument is correctly used to instantiate and open the reporter.

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
index 7b35d91..c7a112a 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
@@ -57,6 +59,25 @@ public class MetricGroupRegistrationTest {
 		Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
 		assertEquals("gauge", TestReporter1.lastPassedName);
 
+		Histogram histogram = root.histogram("histogram", new Histogram() {
+			@Override
+			public void update(long value) {
+
+			}
+
+			@Override
+			public long getCount() {
+				return 0;
+			}
+
+			@Override
+			public HistogramStatistics getStatistics() {
+				return null;
+			}
+		});
+
+		Assert.assertEquals(histogram, TestReporter1.lastPassedMetric);
+		assertEquals("histogram", TestReporter1.lastPassedName);
 		registry.shutdown();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
index d25f744..9e638a7 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java
@@ -20,11 +20,16 @@ package org.apache.flink.metrics.reporter;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.MetricRegistry;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
@@ -37,7 +42,7 @@ import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_JMX_PORT;
 import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_CLASS;
 import static org.junit.Assert.assertEquals;
 
-public class JMXReporterTest {
+public class JMXReporterTest extends TestLogger {
 
 	@Test
 	public void testReplaceInvalidChars() {
@@ -188,4 +193,105 @@ public class JMXReporterTest {
 		rep2.close();
 		reg.shutdown();
 	}
+
+	/**
+	 * Tests that histograms are properly reported via the JMXReporter.
+	 */
+	@Test
+	public void testHistogramReporting() throws Exception {
+		MetricRegistry registry = null;
+		String histogramName = "histogram";
+
+		try {
+			Configuration config = new Configuration();
+
+			registry = new MetricRegistry(config);
+
+			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+			TestingHistogram histogram = new TestingHistogram();
+
+			metricGroup.histogram(histogramName, histogram);
+
+			MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+
+			ObjectName objectName = new ObjectName(JMXReporter.generateJmxName(histogramName, metricGroup.getScopeComponents()));
+
+			MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
+
+			MBeanAttributeInfo[] attributeInfos = info.getAttributes();
+
+			assertEquals(11, attributeInfos.length);
+
+			assertEquals(histogram.getCount(), mBeanServer.getAttribute(objectName, "Count"));
+			assertEquals(histogram.getStatistics().getMean(), mBeanServer.getAttribute(objectName, "Mean"));
+			assertEquals(histogram.getStatistics().getStdDev(), mBeanServer.getAttribute(objectName, "StdDev"));
+			assertEquals(histogram.getStatistics().getMax(), mBeanServer.getAttribute(objectName, "Max"));
+			assertEquals(histogram.getStatistics().getMin(), mBeanServer.getAttribute(objectName, "Min"));
+			assertEquals(histogram.getStatistics().getQuantile(0.5), mBeanServer.getAttribute(objectName, "Median"));
+			assertEquals(histogram.getStatistics().getQuantile(0.75), mBeanServer.getAttribute(objectName, "75thPercentile"));
+			assertEquals(histogram.getStatistics().getQuantile(0.95), mBeanServer.getAttribute(objectName, "95thPercentile"));
+			assertEquals(histogram.getStatistics().getQuantile(0.98), mBeanServer.getAttribute(objectName, "98thPercentile"));
+			assertEquals(histogram.getStatistics().getQuantile(0.99), mBeanServer.getAttribute(objectName, "99thPercentile"));
+			assertEquals(histogram.getStatistics().getQuantile(0.999), mBeanServer.getAttribute(objectName, "999thPercentile"));
+
+		} finally {
+			if (registry != null) {
+				registry.shutdown();
+			}
+		}
+	}
+
+	static class TestingHistogram implements Histogram {
+
+		@Override
+		public void update(long value) {
+
+		}
+
+		@Override
+		public long getCount() {
+			return 1;
+		}
+
+		@Override
+		public HistogramStatistics getStatistics() {
+			return new HistogramStatistics() {
+				@Override
+				public double getQuantile(double quantile) {
+					return quantile;
+				}
+
+				@Override
+				public long[] getValues() {
+					return new long[0];
+				}
+
+				@Override
+				public int size() {
+					return 3;
+				}
+
+				@Override
+				public double getMean() {
+					return 4;
+				}
+
+				@Override
+				public double getStdDev() {
+					return 5;
+				}
+
+				@Override
+				public long getMax() {
+					return 6;
+				}
+
+				@Override
+				public long getMin() {
+					return 7;
+				}
+			};
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml b/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
deleted file mode 100644
index a386880..0000000
--- a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-metric-reporters</artifactId>
-		<version>1.1-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-metrics-dropwizard</artifactId>
-	<name>flink-metrics-dropwizard</name>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>io.dropwizard.metrics</groupId>
-			<artifactId>metrics-core</artifactId>
-			<version>${metrics.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<version>2.4</version>
-				<configuration>
-					<descriptorRefs>
-						<descriptorRef>jar-with-dependencies</descriptorRef>
-					</descriptorRefs>
-				</configuration>
-				<executions>
-					<execution>
-						<id>make-assembly</id>
-						<phase>package</phase>
-						<goals>
-							<goal>single</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
deleted file mode 100644
index d67f3e3..0000000
--- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.dropwizard;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
-import com.codahale.metrics.ScheduledReporter;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.metrics.CounterWrapper;
-import org.apache.flink.dropwizard.metrics.GaugeWrapper;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedMap;
-
-/**
- * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a
- * Dropwizard {@link com.codahale.metrics.Reporter}.
- */
-@PublicEvolving
-public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled, Reporter {
-
-	public static final String ARG_HOST = "host";
-	public static final String ARG_PORT = "port";
-	public static final String ARG_PREFIX = "prefix";
-	public static final String ARG_CONVERSION_RATE = "rateConversion";
-	public static final String ARG_CONVERSION_DURATION = "durationConversion";
-
-	// ------------------------------------------------------------------------
-
-	protected final MetricRegistry registry;
-
-	protected ScheduledReporter reporter;
-
-	private final Map<Gauge<?>, String> gauges = new HashMap<>();
-	private final Map<Counter, String> counters = new HashMap<>();
-
-	// ------------------------------------------------------------------------
-
-	protected ScheduledDropwizardReporter() {
-		this.registry = new MetricRegistry();
-	}
-
-	// ------------------------------------------------------------------------
-	//  life cycle
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void open(Configuration config) {
-		this.reporter = getReporter(config);
-	}
-
-	@Override
-	public void close() {
-		this.reporter.stop();
-	}
-
-	// ------------------------------------------------------------------------
-	//  adding / removing metrics
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
-		final String fullName = group.getScopeString() + '.' + metricName;
-
-		synchronized (this) {
-			if (metric instanceof Counter) {
-				counters.put((Counter) metric, fullName);
-				registry.register(fullName, new CounterWrapper((Counter) metric));
-			}
-			else if (metric instanceof Gauge) {
-				gauges.put((Gauge<?>) metric, fullName);
-				registry.register(fullName, GaugeWrapper.fromGauge((Gauge<?>) metric));
-			}
-		}
-	}
-
-	@Override
-	public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
-		synchronized (this) {
-			String fullName;
-			
-			if (metric instanceof Counter) {
-				fullName = counters.remove(metric);
-			} else if (metric instanceof Gauge) {
-				fullName = gauges.remove(metric);
-			} else {
-				fullName = null;
-			}
-			
-			if (fullName != null) {
-				registry.remove(fullName);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  scheduled reporting
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void report() {
-		// we do not need to lock here, because the dropwizard registry is
-		// internally a concurrent map
-		@SuppressWarnings("rawtypes")
-		final SortedMap<String, com.codahale.metrics.Gauge> gauges = registry.getGauges();
-		final SortedMap<String, com.codahale.metrics.Counter> counters = registry.getCounters();
-		final SortedMap<String, com.codahale.metrics.Histogram> histograms = registry.getHistograms();
-		final SortedMap<String, com.codahale.metrics.Meter> meters = registry.getMeters();
-		final SortedMap<String, com.codahale.metrics.Timer> timers = registry.getTimers();
-
-		this.reporter.report(gauges, counters, histograms, meters, timers);
-	}
-
-	public abstract ScheduledReporter getReporter(Configuration config);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
deleted file mode 100644
index f6630b9..0000000
--- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.dropwizard.metrics;
-
-import org.apache.flink.metrics.Counter;
-
-public class CounterWrapper extends com.codahale.metrics.Counter {
-	private final Counter counter;
-
-	public CounterWrapper(Counter counter) {
-		this.counter = counter;
-	}
-
-	@Override
-	public long getCount() {
-		return this.counter.getCount();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
deleted file mode 100644
index 655cd60..0000000
--- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.dropwizard.metrics;
-
-import org.apache.flink.metrics.Gauge;
-
-public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
-	
-	private final Gauge<T> gauge;
-
-	public GaugeWrapper(Gauge<T> gauge) {
-		this.gauge = gauge;
-	}
-
-	@Override
-	public T getValue() {
-		return this.gauge.getValue();
-	}
-
-	public static <T> GaugeWrapper<T> fromGauge(Gauge<?> gauge) {
-		@SuppressWarnings("unchecked")
-		Gauge<T> typedGauge = (Gauge<T>) gauge;
-		return new GaugeWrapper<>(typedGauge); 
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-ganglia/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-ganglia/pom.xml b/flink-metric-reporters/flink-metrics-ganglia/pom.xml
deleted file mode 100644
index a457ca1..0000000
--- a/flink-metric-reporters/flink-metrics-ganglia/pom.xml
+++ /dev/null
@@ -1,90 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-metric-reporters</artifactId>
-		<version>1.1-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-metrics-ganglia</artifactId>
-	<name>flink-metrics-ganglia</name>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-metrics-dropwizard</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>info.ganglia.gmetric4j</groupId>
-			<artifactId>gmetric4j</artifactId>
-			<version>1.0.7</version>
-		</dependency>
-
-		<dependency>
-			<groupId>io.dropwizard.metrics</groupId>
-			<artifactId>metrics-core</artifactId>
-			<version>${metrics.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>io.dropwizard.metrics</groupId>
-			<artifactId>metrics-ganglia</artifactId>
-			<version>${metrics.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<version>2.4</version>
-				<configuration>
-					<descriptorRefs>
-						<descriptorRef>jar-with-dependencies</descriptorRef>
-					</descriptorRefs>
-				</configuration>
-				<executions>
-					<execution>
-						<id>make-assembly</id>
-						<phase>package</phase>
-						<goals>
-							<goal>single</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
deleted file mode 100644
index adf9394..0000000
--- a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.ganglia;
-
-import com.codahale.metrics.ScheduledReporter;
-
-import info.ganglia.gmetric4j.gmetric.GMetric;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-@PublicEvolving
-public class GangliaReporter extends ScheduledDropwizardReporter {
-	
-	public static final String ARG_DMAX = "dmax";
-	public static final String ARG_TMAX = "tmax";
-	public static final String ARG_TTL = "ttl";
-	public static final String ARG_MODE_ADDRESSING = "addressingMode";
-
-	@Override
-	public ScheduledReporter getReporter(Configuration config) {
-
-		try {
-			String host = config.getString(ARG_HOST, null);
-			int port = config.getInteger(ARG_PORT, -1);
-			if (host == null || host.length() == 0 || port < 1) {
-				throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
-			}
-			String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
-			int ttl = config.getInteger(ARG_TTL, -1);
-			GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
-
-			String prefix = config.getString(ARG_PREFIX, null);
-			String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
-			String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
-			int dMax = config.getInteger(ARG_DMAX, 0);
-			int tMax = config.getInteger(ARG_TMAX, 60);
-
-			com.codahale.metrics.ganglia.GangliaReporter.Builder builder =
-				com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
-
-			if (prefix != null) {
-				builder.prefixedWith(prefix);
-			}
-			if (conversionRate != null) {
-				builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
-			}
-			if (conversionDuration != null) {
-				builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
-			}
-			builder.withDMax(dMax);
-			builder.withTMax(tMax);
-
-			return builder.build(gMetric);
-		} catch (IOException e) {
-			throw new RuntimeException("Error while instantiating GangliaReporter.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-graphite/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-graphite/pom.xml b/flink-metric-reporters/flink-metrics-graphite/pom.xml
deleted file mode 100644
index 714b77f..0000000
--- a/flink-metric-reporters/flink-metrics-graphite/pom.xml
+++ /dev/null
@@ -1,84 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-metric-reporters</artifactId>
-		<version>1.1-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-metrics-graphite</artifactId>
-	<name>flink-metrics-graphite</name>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-metrics-dropwizard</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>io.dropwizard.metrics</groupId>
-			<artifactId>metrics-core</artifactId>
-			<version>${metrics.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>io.dropwizard.metrics</groupId>
-			<artifactId>metrics-graphite</artifactId>
-			<version>${metrics.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<version>2.4</version>
-				<configuration>
-					<descriptorRefs>
-						<descriptorRef>jar-with-dependencies</descriptorRef>
-					</descriptorRefs>
-				</configuration>
-				<executions>
-					<execution>
-						<id>make-assembly</id>
-						<phase>package</phase>
-						<goals>
-							<goal>single</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
deleted file mode 100644
index 16be830..0000000
--- a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.graphite;
-
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.graphite.Graphite;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
-
-import java.util.concurrent.TimeUnit;
-
-@PublicEvolving
-public class GraphiteReporter extends ScheduledDropwizardReporter {
-
-	@Override
-	public ScheduledReporter getReporter(Configuration config) {
-		String host = config.getString(ARG_HOST, null);
-		int port = config.getInteger(ARG_PORT, -1);
-
-		if (host == null || host.length() == 0 || port < 1) {
-			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
-		}
-
-		String prefix = config.getString(ARG_PREFIX, null);
-		String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
-		String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
-
-		com.codahale.metrics.graphite.GraphiteReporter.Builder builder =
-			com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry);
-
-		if (prefix != null) {
-			builder.prefixedWith(prefix);
-		}
-
-		if (conversionRate != null) {
-			builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
-		}
-
-		if (conversionDuration != null) {
-			builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
-		}
-
-		return builder.build(new Graphite(host, port));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-statsd/pom.xml b/flink-metric-reporters/flink-metrics-statsd/pom.xml
deleted file mode 100644
index 3052a10..0000000
--- a/flink-metric-reporters/flink-metrics-statsd/pom.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-metric-reporters</artifactId>
-		<version>1.1-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-metrics-statsd</artifactId>
-	<name>flink-metrics-statsd</name>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-	</dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
deleted file mode 100644
index 087a265..0000000
--- a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.statsd;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.reporter.AbstractReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.util.ConcurrentModificationException;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/**
- * Largely based on the StatsDReporter class by ReadyTalk
- * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
- *
- * Ported since it was not present in maven central.
- */
-@PublicEvolving
-public class StatsDReporter extends AbstractReporter implements Scheduled {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class);
-
-	public static final String ARG_HOST = "host";
-	public static final String ARG_PORT = "port";
-//	public static final String ARG_CONVERSION_RATE = "rateConversion";
-//	public static final String ARG_CONVERSION_DURATION = "durationConversion";
-
-	private boolean closed = false;
-
-	private DatagramSocket socket;
-	private InetSocketAddress address;
-
-	@Override
-	public void open(Configuration config) {
-		String host = config.getString(ARG_HOST, null);
-		int port = config.getInteger(ARG_PORT, -1);
-
-		if (host == null || host.length() == 0 || port < 1) {
-			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
-		}
-
-		this.address = new InetSocketAddress(host, port);
-
-//		String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS");
-//		String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
-//		this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
-//		this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1);
-
-		try {
-			this.socket = new DatagramSocket(0);
-		} catch (SocketException e) {
-			throw new RuntimeException("Could not create datagram socket. ", e);
-		}
-	}
-
-	@Override
-	public void close() {
-		closed = true;
-		if (socket != null && !socket.isClosed()) {
-			socket.close();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void report() {
-		// instead of locking here, we tolerate exceptions
-		// we do this to prevent holding the lock for very long and blocking
-		// operator creation and shutdown
-		try {
-			for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
-				if (closed) {
-					return;
-				}
-				reportGauge(entry.getValue(), entry.getKey());
-			}
-
-			for (Map.Entry<Counter, String> entry : counters.entrySet()) {
-				if (closed) {
-					return;
-				}
-				reportCounter(entry.getValue(), entry.getKey());
-			}
-		}
-		catch (ConcurrentModificationException | NoSuchElementException e) {
-			// ignore - may happen when metrics are concurrently added or removed
-			// report next time
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	private void reportCounter(final String name, final Counter counter) {
-		send(name, String.valueOf(counter.getCount()));
-	}
-
-	private void reportGauge(final String name, final Gauge<?> gauge) {
-		Object value = gauge.getValue();
-		if (value != null) {
-			send(name, value.toString());
-		}
-	}
-
-	private void send(final String name, final String value) {
-		try {
-			String formatted = String.format("%s:%s|g", name, value);
-			byte[] data = formatted.getBytes();
-			socket.send(new DatagramPacket(data, data.length, this.address));
-		}
-		catch (IOException e) {
-			LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metric-reporters/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/pom.xml b/flink-metric-reporters/pom.xml
deleted file mode 100644
index 01a809c..0000000
--- a/flink-metric-reporters/pom.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>1.1-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-metric-reporters</artifactId>
-	<name>flink-metric-reporters</name>
-	<packaging>pom</packaging>
-
-	<modules>
-		<module>flink-metrics-dropwizard</module>
-		<module>flink-metrics-ganglia</module>
-		<module>flink-metrics-graphite</module>
-		<module>flink-metrics-statsd</module>
-	</modules>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml b/flink-metrics/flink-metrics-dropwizard/pom.xml
new file mode 100644
index 0000000..90dbc00
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-dropwizard</artifactId>
+	<name>flink-metrics-dropwizard</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
new file mode 100644
index 0000000..062bbd8
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+import com.codahale.metrics.ScheduledReporter;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
+import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+/**
+ * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a
+ * Dropwizard {@link com.codahale.metrics.Reporter}.
+ */
+@PublicEvolving
+public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled, Reporter {
+
+	protected final Logger log = LoggerFactory.getLogger(getClass());
+
+	public static final String ARG_HOST = "host";
+	public static final String ARG_PORT = "port";
+	public static final String ARG_PREFIX = "prefix";
+	public static final String ARG_CONVERSION_RATE = "rateConversion";
+	public static final String ARG_CONVERSION_DURATION = "durationConversion";
+
+	// ------------------------------------------------------------------------
+
+	protected final MetricRegistry registry;
+
+	protected ScheduledReporter reporter;
+
+	private final Map<Gauge<?>, String> gauges = new HashMap<>();
+	private final Map<Counter, String> counters = new HashMap<>();
+	private final Map<Histogram, String> histograms = new HashMap<>();
+
+	// ------------------------------------------------------------------------
+
+	protected ScheduledDropwizardReporter() {
+		this.registry = new MetricRegistry();
+	}
+
+	// ------------------------------------------------------------------------
+	//  life cycle
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void open(Configuration config) {
+		this.reporter = getReporter(config);
+	}
+
+	@Override
+	public void close() {
+		this.reporter.stop();
+	}
+
+	// ------------------------------------------------------------------------
+	//  adding / removing metrics
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
+		final String fullName = group.getScopeString() + '.' + metricName;
+
+		synchronized (this) {
+			if (metric instanceof Counter) {
+				counters.put((Counter) metric, fullName);
+				registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
+			}
+			else if (metric instanceof Gauge) {
+				gauges.put((Gauge<?>) metric, fullName);
+				registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
+			} else if (metric instanceof Histogram) {
+				Histogram histogram = (Histogram) metric;
+				histograms.put(histogram, fullName);
+
+				if (histogram instanceof DropwizardHistogramWrapper) {
+					registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizarHistogram());
+				} else {
+					registry.register(fullName, new FlinkHistogramWrapper(histogram));
+				}
+			} else {
+				log.warn("Cannot add metric of type {}. This indicates that the reporter " +
+					"does not support this metric type.", metric.getClass().getName());
+			}
+		}
+	}
+
+	@Override
+	public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
+		synchronized (this) {
+			String fullName;
+			
+			if (metric instanceof Counter) {
+				fullName = counters.remove(metric);
+			} else if (metric instanceof Gauge) {
+				fullName = gauges.remove(metric);
+			} else if (metric instanceof Histogram) {
+				fullName = histograms.remove(metric);
+			} else {
+				fullName = null;
+			}
+			
+			if (fullName != null) {
+				registry.remove(fullName);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  scheduled reporting
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void report() {
+		// we do not need to lock here, because the dropwizard registry is
+		// internally a concurrent map
+		@SuppressWarnings("rawtypes")
+		final SortedMap<String, com.codahale.metrics.Gauge> gauges = registry.getGauges();
+		final SortedMap<String, com.codahale.metrics.Counter> counters = registry.getCounters();
+		final SortedMap<String, com.codahale.metrics.Histogram> histograms = registry.getHistograms();
+		final SortedMap<String, com.codahale.metrics.Meter> meters = registry.getMeters();
+		final SortedMap<String, com.codahale.metrics.Timer> timers = registry.getTimers();
+
+		this.reporter.report(gauges, counters, histograms, meters, timers);
+	}
+
+	public abstract ScheduledReporter getReporter(Configuration config);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
new file mode 100644
index 0000000..6f4eab2
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Snapshot;
+import org.apache.flink.metrics.HistogramStatistics;
+
+/**
+ * Dropwizard histogram statistics implementation returned by {@link DropwizardHistogramWrapper}.
+ * The statistics class wraps a {@link Snapshot} instance and forwards the method calls accordingly.
+ */
+class DropwizardHistogramStatistics extends HistogramStatistics {
+
+	private final com.codahale.metrics.Snapshot snapshot;
+
+	DropwizardHistogramStatistics(com.codahale.metrics.Snapshot snapshot) {
+		this.snapshot = snapshot;
+	}
+
+	@Override
+	public double getQuantile(double quantile) {
+		return snapshot.getValue(quantile);
+	}
+
+	@Override
+	public long[] getValues() {
+		return snapshot.getValues();
+	}
+
+	@Override
+	public int size() {
+		return snapshot.size();
+	}
+
+	@Override
+	public double getMean() {
+		return snapshot.getMean();
+	}
+
+	@Override
+	public double getStdDev() {
+		return snapshot.getStdDev();
+	}
+
+	@Override
+	public long getMax() {
+		return snapshot.getMax();
+	}
+
+	@Override
+	public long getMin() {
+		return snapshot.getMin();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
new file mode 100644
index 0000000..79a6a56
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+/**
+ * Wrapper to use a Dropwizard {@link com.codahale.metrics.Histogram} as a Flink {@link Histogram}.
+ */
+public class DropwizardHistogramWrapper implements Histogram {
+
+	private final com.codahale.metrics.Histogram dropwizarHistogram;
+
+	public DropwizardHistogramWrapper(com.codahale.metrics.Histogram dropwizardHistogram) {
+		this.dropwizarHistogram = dropwizardHistogram;
+	}
+
+	public com.codahale.metrics.Histogram getDropwizarHistogram() {
+		return dropwizarHistogram;
+	}
+
+	@Override
+	public void update(long value) {
+		dropwizarHistogram.update(value);
+	}
+
+	@Override
+	public long getCount() {
+		return dropwizarHistogram.getCount();
+	}
+
+	@Override
+	public HistogramStatistics getStatistics() {
+		return new DropwizardHistogramStatistics(dropwizarHistogram.getSnapshot());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
new file mode 100644
index 0000000..a44c3f5
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Counter;
+
+public class FlinkCounterWrapper extends com.codahale.metrics.Counter {
+	private final Counter counter;
+
+	public FlinkCounterWrapper(Counter counter) {
+		this.counter = counter;
+	}
+
+	@Override
+	public long getCount() {
+		return this.counter.getCount();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
new file mode 100644
index 0000000..058ecad
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+public class FlinkGaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
+	
+	private final Gauge<T> gauge;
+
+	public FlinkGaugeWrapper(Gauge<T> gauge) {
+		this.gauge = gauge;
+	}
+
+	@Override
+	public T getValue() {
+		return this.gauge.getValue();
+	}
+
+	public static <T> FlinkGaugeWrapper<T> fromGauge(Gauge<?> gauge) {
+		@SuppressWarnings("unchecked")
+		Gauge<T> typedGauge = (Gauge<T>) gauge;
+		return new FlinkGaugeWrapper<>(typedGauge);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
new file mode 100644
index 0000000..8bd8078
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Snapshot;
+import org.apache.flink.metrics.Histogram;
+
+/**
+ * Wrapper to use a Flink {@link Histogram} as a Dropwizard {@link com.codahale.metrics.Histogram}.
+ * This is necessary to report Flink's histograms via the Dropwizard
+ * {@link com.codahale.metrics.Reporter}.
+ */
+public class FlinkHistogramWrapper extends com.codahale.metrics.Histogram {
+
+	private final Histogram histogram;
+
+	public FlinkHistogramWrapper(Histogram histogram) {
+		super(null);
+		this.histogram = histogram;
+	}
+
+	@Override
+	public void update(long value) {
+		histogram.update(value);
+	}
+
+	@Override
+	public long getCount() {
+		return histogram.getCount();
+	}
+
+	@Override
+	public Snapshot getSnapshot() {
+		return new HistogramStatisticsWrapper(histogram.getStatistics());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
new file mode 100644
index 0000000..6d3a69b
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Snapshot;
+import org.apache.flink.metrics.HistogramStatistics;
+
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+
+/**
+ * Wrapper to use Flink's {@link HistogramStatistics} as a Dropwizard {@link Snapshot}. This is
+ * necessary to report Flink's histograms via the Dropwizard {@link com.codahale.metrics.Reporter}.
+ */
+class HistogramStatisticsWrapper extends Snapshot {
+
+	private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+	private final HistogramStatistics histogramStatistics;
+
+	HistogramStatisticsWrapper(HistogramStatistics histogramStatistics) {
+		this.histogramStatistics = histogramStatistics;
+	}
+	@Override
+	public double getValue(double quantile) {
+		return histogramStatistics.getQuantile(quantile);
+	}
+
+	@Override
+	public long[] getValues() {
+		return histogramStatistics.getValues();
+	}
+
+	@Override
+	public int size() {
+		return histogramStatistics.size();
+	}
+
+	@Override
+	public long getMax() {
+		return histogramStatistics.getMax();
+	}
+
+	@Override
+	public double getMean() {
+		return histogramStatistics.getMean();
+	}
+
+	@Override
+	public long getMin() {
+		return histogramStatistics.getMin();
+	}
+
+	@Override
+	public double getStdDev() {
+		return histogramStatistics.getStdDev();
+	}
+
+	@Override
+	public void dump(OutputStream output) {
+		try(PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(output, UTF_8))){
+
+			for (Long value : histogramStatistics.getValues()) {
+				printWriter.printf("%d%n", value);
+			}
+		}
+	}
+}