You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/27 13:59:19 UTC

[2/3] flink git commit: [FLINK-3951] Add Histogram metric type

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
new file mode 100644
index 0000000..2479c26
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_INTERVAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
+
+	/**
+	 * Tests the histogram functionality of the DropwizardHistogramWrapper.
+	 */
+	@Test
+	public void testDropwizardHistogramWrapper() {
+		int size = 10;
+		DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
+			new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
+
+		for (int i = 0; i < size; i++) {
+			histogramWrapper.update(i);
+
+			assertEquals(i + 1, histogramWrapper.getCount());
+			assertEquals(i, histogramWrapper.getStatistics().getMax());
+			assertEquals(0, histogramWrapper.getStatistics().getMin());
+		}
+
+		assertEquals(size, histogramWrapper.getStatistics().size());
+		assertEquals((size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+
+		for (int i = size; i < 2 * size; i++) {
+			histogramWrapper.update(i);
+
+			assertEquals(i + 1, histogramWrapper.getCount());
+			assertEquals(i, histogramWrapper.getStatistics().getMax());
+			assertEquals(i + 1 - size, histogramWrapper.getStatistics().getMin());
+		}
+
+		assertEquals(size, histogramWrapper.getStatistics().size());
+		assertEquals(size + (size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+	}
+
+	/**
+	 * Tests that the DropwizardHistogramWrapper reports correct dropwizard snapshots to the
+	 * ScheduledReporter.
+	 */
+	@Test
+	public void testDropwizardHistogramWrapperReporting() throws Exception {
+		long reportingInterval = 1000;
+		long timeout = 30000;
+		int size = 10;
+		String histogramMetricName = "histogram";
+		Configuration config = new Configuration();
+		config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestingReporter.class.getName());
+		config.setString(KEY_METRICS_REPORTER_INTERVAL, reportingInterval + " MILLISECONDS");
+
+		MetricRegistry registry = null;
+
+		try {
+			registry = new MetricRegistry(config);
+			DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
+				new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
+
+			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+			metricGroup.histogram(histogramMetricName, histogramWrapper);
+
+			String fullMetricName = metricGroup.getScopeString() + "." + histogramMetricName;
+
+			Field f = registry.getClass().getDeclaredField("reporter");
+			f.setAccessible(true);
+
+			MetricReporter reporter = (MetricReporter) f.get(registry);
+
+			assertTrue(reporter instanceof TestingReporter);
+
+			TestingReporter testingReporter = (TestingReporter) reporter;
+
+			TestingScheduledReporter scheduledReporter = testingReporter.scheduledReporter;
+
+			// check that the metric has been registered
+			assertEquals(1, testingReporter.getMetrics().size());
+
+			for (int i = 0; i < size; i++) {
+				histogramWrapper.update(i);
+			}
+
+			Future<Snapshot> snapshotFuture = scheduledReporter.getNextHistogramSnapshot(fullMetricName);
+
+			Snapshot snapshot = snapshotFuture.get(timeout, TimeUnit.MILLISECONDS);
+
+			assertEquals(0, snapshot.getMin());
+			assertEquals((size - 1) / 2.0, snapshot.getMedian(), 0.001);
+			assertEquals(size - 1, snapshot.getMax());
+			assertEquals(size, snapshot.size());
+
+			registry.unregister(histogramWrapper, "histogram", metricGroup);
+
+			// check that the metric has been de-registered
+			assertEquals(0, testingReporter.getMetrics().size());
+		} finally {
+			if (registry != null) {
+				registry.shutdown();
+			}
+		}
+	}
+
+	public static class TestingReporter extends ScheduledDropwizardReporter {
+		TestingScheduledReporter scheduledReporter = null;
+
+		@Override
+		public ScheduledReporter getReporter(Configuration config) {
+			scheduledReporter = new TestingScheduledReporter(
+				registry,
+				getClass().getName(),
+				null,
+				TimeUnit.MILLISECONDS,
+				TimeUnit.MILLISECONDS);
+
+			return scheduledReporter;
+		}
+
+		public Map<String, com.codahale.metrics.Metric> getMetrics() {
+			return registry.getMetrics();
+		}
+	}
+
+	static class TestingScheduledReporter extends ScheduledReporter {
+
+		final Map<String, Snapshot> histogramSnapshots = new HashMap<>();
+		final Map<String, List<CompletableFuture<Snapshot>>> histogramSnapshotFutures = new HashMap<>();
+
+		protected TestingScheduledReporter(com.codahale.metrics.MetricRegistry registry, String name, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit) {
+			super(registry, name, filter, rateUnit, durationUnit);
+		}
+
+		@Override
+		public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, com.codahale.metrics.Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
+			for (Map.Entry<String, com.codahale.metrics.Histogram> entry: histograms.entrySet()) {
+				reportHistogram(entry.getKey(), entry.getValue());
+			}
+		}
+
+		void reportHistogram(String name, com.codahale.metrics.Histogram histogram) {
+			histogramSnapshots.put(name, histogram.getSnapshot());
+
+			synchronized (histogramSnapshotFutures) {
+				if (histogramSnapshotFutures.containsKey(name)) {
+					List<CompletableFuture<Snapshot>> futures = histogramSnapshotFutures.remove(name);
+
+					for (CompletableFuture<Snapshot> future: futures) {
+						future.complete(histogram.getSnapshot());
+					}
+				}
+			}
+		}
+
+		Future<Snapshot> getNextHistogramSnapshot(String name) {
+			synchronized (histogramSnapshotFutures) {
+				List<CompletableFuture<Snapshot>> futures;
+				if (histogramSnapshotFutures.containsKey(name)) {
+					futures = histogramSnapshotFutures.get(name);
+				} else {
+					futures = new ArrayList<>();
+					histogramSnapshotFutures.put(name, futures);
+				}
+
+				CompletableFuture<Snapshot> future = new CompletableFuture<>();
+				futures.add(future);
+
+				return future;
+			}
+		}
+	}
+
+	static class CompletableFuture<T> implements Future<T> {
+
+		private Exception exception = null;
+		private T value = null;
+
+		private Object lock = new Object();
+
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			synchronized (lock) {
+				if (isDone()) {
+					return false;
+				} else {
+					exception = new CancellationException("Future was cancelled.");
+
+					lock.notifyAll();
+
+					return true;
+				}
+			}
+		}
+
+		@Override
+		public boolean isCancelled() {
+			return exception instanceof CancellationException;
+		}
+
+		@Override
+		public boolean isDone() {
+			return value != null || exception != null;
+		}
+
+		@Override
+		public T get() throws InterruptedException, ExecutionException {
+			while (!isDone() && !isCancelled()) {
+				synchronized (lock) {
+					lock.wait();
+				}
+			}
+
+			if (exception != null) {
+				throw new ExecutionException(exception);
+			} else if (value != null) {
+				return value;
+			} else {
+				throw new ExecutionException(new Exception("Future did not complete correctly."));
+			}
+		}
+
+		@Override
+		public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+			long timeoutMs = unit.toMillis(timeout);
+			long timeoutEnd = timeoutMs + System.currentTimeMillis();
+
+			while (!isDone() && !isCancelled() && timeoutMs > 0) {
+				synchronized (lock) {
+					lock.wait(unit.toMillis(timeoutMs));
+				}
+
+				timeoutMs = timeoutEnd - System.currentTimeMillis();
+			}
+
+			if (exception != null) {
+				throw new ExecutionException(exception);
+			} else if (value != null) {
+				return value;
+			} else {
+				throw new ExecutionException(new Exception("Future did not complete correctly."));
+			}
+		}
+
+		public boolean complete(T value) {
+			synchronized (lock) {
+				if (!isDone()) {
+					this.value = value;
+
+					lock.notifyAll();
+
+					return true;
+				} else {
+					return false;
+				}
+			}
+		}
+
+		public boolean fail(Exception exception) {
+			synchronized (lock) {
+				if (!isDone()) {
+					this.exception = exception;
+
+					lock.notifyAll();
+
+					return true;
+				} else {
+					return false;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    
+    <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+    <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-ganglia/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/pom.xml b/flink-metrics/flink-metrics-ganglia/pom.xml
new file mode 100644
index 0000000..c4f51da
--- /dev/null
+++ b/flink-metrics/flink-metrics-ganglia/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-ganglia</artifactId>
+	<name>flink-metrics-ganglia</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-dropwizard</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>info.ganglia.gmetric4j</groupId>
+			<artifactId>gmetric4j</artifactId>
+			<version>1.0.7</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-ganglia</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
new file mode 100644
index 0000000..adf9394
--- /dev/null
+++ b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.ganglia;
+
+import com.codahale.metrics.ScheduledReporter;
+
+import info.ganglia.gmetric4j.gmetric.GMetric;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+@PublicEvolving
+public class GangliaReporter extends ScheduledDropwizardReporter {
+	
+	public static final String ARG_DMAX = "dmax";
+	public static final String ARG_TMAX = "tmax";
+	public static final String ARG_TTL = "ttl";
+	public static final String ARG_MODE_ADDRESSING = "addressingMode";
+
+	@Override
+	public ScheduledReporter getReporter(Configuration config) {
+
+		try {
+			String host = config.getString(ARG_HOST, null);
+			int port = config.getInteger(ARG_PORT, -1);
+			if (host == null || host.length() == 0 || port < 1) {
+				throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+			}
+			String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
+			int ttl = config.getInteger(ARG_TTL, -1);
+			GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
+
+			String prefix = config.getString(ARG_PREFIX, null);
+			String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
+			String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
+			int dMax = config.getInteger(ARG_DMAX, 0);
+			int tMax = config.getInteger(ARG_TMAX, 60);
+
+			com.codahale.metrics.ganglia.GangliaReporter.Builder builder =
+				com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
+
+			if (prefix != null) {
+				builder.prefixedWith(prefix);
+			}
+			if (conversionRate != null) {
+				builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+			}
+			if (conversionDuration != null) {
+				builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+			}
+			builder.withDMax(dMax);
+			builder.withTMax(tMax);
+
+			return builder.build(gMetric);
+		} catch (IOException e) {
+			throw new RuntimeException("Error while instantiating GangliaReporter.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-graphite/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/pom.xml b/flink-metrics/flink-metrics-graphite/pom.xml
new file mode 100644
index 0000000..45fb018
--- /dev/null
+++ b/flink-metrics/flink-metrics-graphite/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-graphite</artifactId>
+	<name>flink-metrics-graphite</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-dropwizard</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-graphite</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
new file mode 100644
index 0000000..16be830
--- /dev/null
+++ b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.graphite;
+
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.graphite.Graphite;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.util.concurrent.TimeUnit;
+
+@PublicEvolving
+public class GraphiteReporter extends ScheduledDropwizardReporter {
+
+	@Override
+	public ScheduledReporter getReporter(Configuration config) {
+		String host = config.getString(ARG_HOST, null);
+		int port = config.getInteger(ARG_PORT, -1);
+
+		if (host == null || host.length() == 0 || port < 1) {
+			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+		}
+
+		String prefix = config.getString(ARG_PREFIX, null);
+		String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
+		String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
+
+		com.codahale.metrics.graphite.GraphiteReporter.Builder builder =
+			com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry);
+
+		if (prefix != null) {
+			builder.prefixedWith(prefix);
+		}
+
+		if (conversionRate != null) {
+			builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+		}
+
+		if (conversionDuration != null) {
+			builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+		}
+
+		return builder.build(new Graphite(host, port));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/pom.xml b/flink-metrics/flink-metrics-statsd/pom.xml
new file mode 100644
index 0000000..8ee0b56
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-statsd</artifactId>
+	<name>flink-metrics-statsd</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
new file mode 100644
index 0000000..3d9bf07
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.statsd;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.util.ConcurrentModificationException;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Largely based on the StatsDReporter class by ReadyTalk
+ * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
+ *
+ * Ported since it was not present in maven central.
+ */
+@PublicEvolving
+public class StatsDReporter extends AbstractReporter implements Scheduled {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class);
+
+	public static final String ARG_HOST = "host";
+	public static final String ARG_PORT = "port";
+//	public static final String ARG_CONVERSION_RATE = "rateConversion";
+//	public static final String ARG_CONVERSION_DURATION = "durationConversion";
+
+	private boolean closed = false;
+
+	private DatagramSocket socket;
+	private InetSocketAddress address;
+
+	@Override
+	public void open(Configuration config) {
+		String host = config.getString(ARG_HOST, null);
+		int port = config.getInteger(ARG_PORT, -1);
+
+		if (host == null || host.length() == 0 || port < 1) {
+			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+		}
+
+		this.address = new InetSocketAddress(host, port);
+
+//		String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS");
+//		String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
+//		this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
+//		this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1);
+
+		try {
+			this.socket = new DatagramSocket(0);
+		} catch (SocketException e) {
+			throw new RuntimeException("Could not create datagram socket. ", e);
+		}
+	}
+
+	@Override
+	public void close() {
+		closed = true;
+		if (socket != null && !socket.isClosed()) {
+			socket.close();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void report() {
+		// instead of locking here, we tolerate exceptions
+		// we do this to prevent holding the lock for very long and blocking
+		// operator creation and shutdown
+		try {
+			for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
+				if (closed) {
+					return;
+				}
+				reportGauge(entry.getValue(), entry.getKey());
+			}
+
+			for (Map.Entry<Counter, String> entry : counters.entrySet()) {
+				if (closed) {
+					return;
+				}
+				reportCounter(entry.getValue(), entry.getKey());
+			}
+
+			for (Map.Entry<Histogram, String> entry : histograms.entrySet()) {
+				reportHistogram(entry.getValue(), entry.getKey());
+			}
+		}
+		catch (ConcurrentModificationException | NoSuchElementException e) {
+			// ignore - may happen when metrics are concurrently added or removed
+			// report next time
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	
+	private void reportCounter(final String name, final Counter counter) {
+		send(name, String.valueOf(counter.getCount()));
+	}
+
+	private void reportGauge(final String name, final Gauge<?> gauge) {
+		Object value = gauge.getValue();
+		if (value != null) {
+			send(name, value.toString());
+		}
+	}
+
+	private void reportHistogram(final String name, final Histogram histogram) {
+		if (histogram != null) {
+
+			HistogramStatistics statistics = histogram.getStatistics();
+
+			if (statistics != null) {
+				send(prefix(name, "count"), String.valueOf(histogram.getCount()));
+				send(prefix(name, "max"), String.valueOf(statistics.getMax()));
+				send(prefix(name, "min"), String.valueOf(statistics.getMin()));
+				send(prefix(name, "mean"), String.valueOf(statistics.getMean()));
+				send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev()));
+				send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5)));
+				send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75)));
+				send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95)));
+				send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98)));
+				send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99)));
+				send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999)));
+			}
+		}
+	}
+
+	private String prefix(String ... names) {
+		if (names.length > 0) {
+			StringBuilder stringBuilder = new StringBuilder(names[0]);
+
+			for (int i = 1; i < names.length; i++) {
+				stringBuilder.append('.').append(names[i]);
+			}
+
+			return stringBuilder.toString();
+		} else {
+			return "";
+		}
+	}
+
+	private void send(final String name, final String value) {
+		try {
+			String formatted = String.format("%s:%s|g", name, value);
+			byte[] data = formatted.getBytes();
+			socket.send(new DatagramPacket(data, data.length, this.address));
+		}
+		catch (IOException e) {
+			LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
new file mode 100644
index 0000000..5f5ef40
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.statsd;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+
+public class StatsDReporterTest extends TestLogger {
+
+	/**
+	 * Tests that histograms are properly reported via the StatsD reporter
+	 */
+	@Test
+	public void testStatsDHistogramReporting() throws Exception {
+		MetricRegistry registry = null;
+		DatagramSocketReceiver receiver = null;
+		Thread receiverThread = null;
+		long timeout = 5000;
+		long joinTimeout = 30000;
+
+		String histogramName = "histogram";
+
+		try {
+			receiver = new DatagramSocketReceiver();
+
+			receiverThread = new Thread(receiver);
+
+			receiverThread.start();
+
+			int port = receiver.getPort();
+
+			Configuration config = new Configuration();
+			config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, StatsDReporter.class.getName());
+			config.setString(MetricRegistry.KEY_METRICS_REPORTER_INTERVAL, "1 SECONDS");
+			config.setString(MetricRegistry.KEY_METRICS_REPORTER_ARGUMENTS, "--host localhost --port " + port);
+
+			registry = new MetricRegistry(config);
+
+			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+			TestingHistogram histogram = new TestingHistogram();
+
+			metricGroup.histogram(histogramName, histogram);
+
+			receiver.waitUntilNumLines(11, timeout);
+
+			Set<String> lines = receiver.getLines();
+
+			String prefix = metricGroup.getScopeString() + "." + histogramName;
+
+			Set<String> expectedLines = new HashSet<>();
+
+			expectedLines.add(prefix + ".count:1|g");
+			expectedLines.add(prefix + ".mean:3.0|g");
+			expectedLines.add(prefix + ".min:6|g");
+			expectedLines.add(prefix + ".max:5|g");
+			expectedLines.add(prefix + ".stddev:4.0|g");
+			expectedLines.add(prefix + ".p75:0.75|g");
+			expectedLines.add(prefix + ".p98:0.98|g");
+			expectedLines.add(prefix + ".p99:0.99|g");
+			expectedLines.add(prefix + ".p999:0.999|g");
+			expectedLines.add(prefix + ".p95:0.95|g");
+			expectedLines.add(prefix + ".p50:0.5|g");
+
+			assertEquals(expectedLines, lines);
+
+		} finally {
+			if (registry != null) {
+				registry.shutdown();
+			}
+
+			if (receiver != null) {
+				receiver.stop();
+			}
+
+			if (receiverThread != null) {
+				receiverThread.join(joinTimeout);
+			}
+		}
+	}
+
+	public static class TestingHistogram implements Histogram {
+
+		@Override
+		public void update(long value) {
+
+		}
+
+		@Override
+		public long getCount() {
+			return 1;
+		}
+
+		@Override
+		public HistogramStatistics getStatistics() {
+			return new HistogramStatistics() {
+				@Override
+				public double getQuantile(double quantile) {
+					return quantile;
+				}
+
+				@Override
+				public long[] getValues() {
+					return new long[0];
+				}
+
+				@Override
+				public int size() {
+					return 2;
+				}
+
+				@Override
+				public double getMean() {
+					return 3;
+				}
+
+				@Override
+				public double getStdDev() {
+					return 4;
+				}
+
+				@Override
+				public long getMax() {
+					return 5;
+				}
+
+				@Override
+				public long getMin() {
+					return 6;
+				}
+			};
+		}
+	}
+
+	public static class DatagramSocketReceiver implements Runnable {
+		private static final Object obj = new Object();
+
+		private final DatagramSocket socket;
+		private final ConcurrentHashMap<String, Object> lines;
+
+		private boolean running = true;
+
+		public DatagramSocketReceiver() throws SocketException {
+			socket = new DatagramSocket();
+			lines = new ConcurrentHashMap<>();
+		}
+
+		public int getPort() {
+			return socket.getLocalPort();
+		}
+
+		public void stop() {
+			running = false;
+			socket.close();
+		}
+
+		public void waitUntilNumLines(int numberLines, long timeout) throws TimeoutException {
+			long endTimeout = System.currentTimeMillis() + timeout;
+			long remainingTimeout = timeout;
+
+			while (numberLines > lines.size() && remainingTimeout > 0) {
+				synchronized (lines) {
+					try {
+						lines.wait(remainingTimeout);
+					} catch (InterruptedException e) {
+						// ignore interruption exceptions
+					}
+				}
+
+				remainingTimeout = endTimeout - System.currentTimeMillis();
+			}
+
+			if (remainingTimeout <= 0) {
+				throw new TimeoutException("Have not received " + numberLines + " in time.");
+			}
+		}
+
+		public Set<String> getLines() {
+			return lines.keySet();
+		}
+
+		@Override
+		public void run() {
+			while (running) {
+				try {
+					byte[] buffer = new byte[1024];
+
+					DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+					socket.receive(packet);
+
+					String line = new String(packet.getData(), 0, packet.getLength());
+
+					lines.put(line, obj);
+
+					synchronized (lines) {
+						lines.notifyAll();
+					}
+				} catch (IOException ex) {
+					// ignore the exceptions
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    
+    <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+    <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
new file mode 100644
index 0000000..542f49c
--- /dev/null
+++ b/flink-metrics/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics</artifactId>
+	<name>flink-metrics</name>
+	<packaging>pom</packaging>
+
+	<modules>
+		<module>flink-metrics-dropwizard</module>
+		<module>flink-metrics-ganglia</module>
+		<module>flink-metrics-graphite</module>
+		<module>flink-metrics-statsd</module>
+	</modules>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9dc8846..9da3fa5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,7 +74,7 @@ under the License.
 		<module>flink-quickstart</module>
 		<module>flink-contrib</module>
 		<module>flink-dist</module>
-		<module>flink-metric-reporters</module>
+		<module>flink-metrics</module>
 	</modules>
 
 	<properties>