You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/27 13:59:19 UTC
[2/3] flink git commit: [FLINK-3951] Add Histogram metric type
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
new file mode 100644
index 0000000..2479c26
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_REPORTER_INTERVAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
+
+ /**
+ * Tests the histogram functionality of the DropwizardHistogramWrapper.
+ */
+ @Test
+ public void testDropwizardHistogramWrapper() {
+ int size = 10;
+ DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
+ new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
+
+ for (int i = 0; i < size; i++) {
+ histogramWrapper.update(i);
+
+ assertEquals(i + 1, histogramWrapper.getCount());
+ assertEquals(i, histogramWrapper.getStatistics().getMax());
+ assertEquals(0, histogramWrapper.getStatistics().getMin());
+ }
+
+ assertEquals(size, histogramWrapper.getStatistics().size());
+ assertEquals((size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+
+ for (int i = size; i < 2 * size; i++) {
+ histogramWrapper.update(i);
+
+ assertEquals(i + 1, histogramWrapper.getCount());
+ assertEquals(i, histogramWrapper.getStatistics().getMax());
+ assertEquals(i + 1 - size, histogramWrapper.getStatistics().getMin());
+ }
+
+ assertEquals(size, histogramWrapper.getStatistics().size());
+ assertEquals(size + (size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+ }
+
+ /**
+ * Tests that the DropwizardHistogramWrapper reports correct dropwizard snapshots to the
+ * ScheduledReporter.
+ */
+ @Test
+ public void testDropwizardHistogramWrapperReporting() throws Exception {
+ long reportingInterval = 1000;
+ long timeout = 30000;
+ int size = 10;
+ String histogramMetricName = "histogram";
+ Configuration config = new Configuration();
+ config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestingReporter.class.getName());
+ config.setString(KEY_METRICS_REPORTER_INTERVAL, reportingInterval + " MILLISECONDS");
+
+ MetricRegistry registry = null;
+
+ try {
+ registry = new MetricRegistry(config);
+ DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
+ new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
+
+ TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+ metricGroup.histogram(histogramMetricName, histogramWrapper);
+
+ String fullMetricName = metricGroup.getScopeString() + "." + histogramMetricName;
+
+ Field f = registry.getClass().getDeclaredField("reporter");
+ f.setAccessible(true);
+
+ MetricReporter reporter = (MetricReporter) f.get(registry);
+
+ assertTrue(reporter instanceof TestingReporter);
+
+ TestingReporter testingReporter = (TestingReporter) reporter;
+
+ TestingScheduledReporter scheduledReporter = testingReporter.scheduledReporter;
+
+ // check that the metric has been registered
+ assertEquals(1, testingReporter.getMetrics().size());
+
+ for (int i = 0; i < size; i++) {
+ histogramWrapper.update(i);
+ }
+
+ Future<Snapshot> snapshotFuture = scheduledReporter.getNextHistogramSnapshot(fullMetricName);
+
+ Snapshot snapshot = snapshotFuture.get(timeout, TimeUnit.MILLISECONDS);
+
+ assertEquals(0, snapshot.getMin());
+ assertEquals((size - 1) / 2.0, snapshot.getMedian(), 0.001);
+ assertEquals(size - 1, snapshot.getMax());
+ assertEquals(size, snapshot.size());
+
+ registry.unregister(histogramWrapper, "histogram", metricGroup);
+
+ // check that the metric has been de-registered
+ assertEquals(0, testingReporter.getMetrics().size());
+ } finally {
+ if (registry != null) {
+ registry.shutdown();
+ }
+ }
+ }
+
+ public static class TestingReporter extends ScheduledDropwizardReporter {
+ TestingScheduledReporter scheduledReporter = null;
+
+ @Override
+ public ScheduledReporter getReporter(Configuration config) {
+ scheduledReporter = new TestingScheduledReporter(
+ registry,
+ getClass().getName(),
+ null,
+ TimeUnit.MILLISECONDS,
+ TimeUnit.MILLISECONDS);
+
+ return scheduledReporter;
+ }
+
+ public Map<String, com.codahale.metrics.Metric> getMetrics() {
+ return registry.getMetrics();
+ }
+ }
+
+ static class TestingScheduledReporter extends ScheduledReporter {
+
+ final Map<String, Snapshot> histogramSnapshots = new HashMap<>();
+ final Map<String, List<CompletableFuture<Snapshot>>> histogramSnapshotFutures = new HashMap<>();
+
+ protected TestingScheduledReporter(com.codahale.metrics.MetricRegistry registry, String name, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit) {
+ super(registry, name, filter, rateUnit, durationUnit);
+ }
+
+ @Override
+ public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, com.codahale.metrics.Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
+ for (Map.Entry<String, com.codahale.metrics.Histogram> entry: histograms.entrySet()) {
+ reportHistogram(entry.getKey(), entry.getValue());
+ }
+ }
+
+ void reportHistogram(String name, com.codahale.metrics.Histogram histogram) {
+ histogramSnapshots.put(name, histogram.getSnapshot());
+
+ synchronized (histogramSnapshotFutures) {
+ if (histogramSnapshotFutures.containsKey(name)) {
+ List<CompletableFuture<Snapshot>> futures = histogramSnapshotFutures.remove(name);
+
+ for (CompletableFuture<Snapshot> future: futures) {
+ future.complete(histogram.getSnapshot());
+ }
+ }
+ }
+ }
+
+ Future<Snapshot> getNextHistogramSnapshot(String name) {
+ synchronized (histogramSnapshotFutures) {
+ List<CompletableFuture<Snapshot>> futures;
+ if (histogramSnapshotFutures.containsKey(name)) {
+ futures = histogramSnapshotFutures.get(name);
+ } else {
+ futures = new ArrayList<>();
+ histogramSnapshotFutures.put(name, futures);
+ }
+
+ CompletableFuture<Snapshot> future = new CompletableFuture<>();
+ futures.add(future);
+
+ return future;
+ }
+ }
+ }
+
+ static class CompletableFuture<T> implements Future<T> {
+
+ private Exception exception = null;
+ private T value = null;
+
+ private Object lock = new Object();
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ synchronized (lock) {
+ if (isDone()) {
+ return false;
+ } else {
+ exception = new CancellationException("Future was cancelled.");
+
+ lock.notifyAll();
+
+ return true;
+ }
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return exception instanceof CancellationException;
+ }
+
+ @Override
+ public boolean isDone() {
+ return value != null || exception != null;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ while (!isDone() && !isCancelled()) {
+ synchronized (lock) {
+ lock.wait();
+ }
+ }
+
+ if (exception != null) {
+ throw new ExecutionException(exception);
+ } else if (value != null) {
+ return value;
+ } else {
+ throw new ExecutionException(new Exception("Future did not complete correctly."));
+ }
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long timeoutMs = unit.toMillis(timeout);
+ long timeoutEnd = timeoutMs + System.currentTimeMillis();
+
+ while (!isDone() && !isCancelled() && timeoutMs > 0) {
+ synchronized (lock) {
+ lock.wait(unit.toMillis(timeoutMs));
+ }
+
+ timeoutMs = timeoutEnd - System.currentTimeMillis();
+ }
+
+ if (exception != null) {
+ throw new ExecutionException(exception);
+ } else if (value != null) {
+ return value;
+ } else {
+ throw new ExecutionException(new Exception("Future did not complete correctly."));
+ }
+ }
+
+ public boolean complete(T value) {
+ synchronized (lock) {
+ if (!isDone()) {
+ this.value = value;
+
+ lock.notifyAll();
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public boolean fail(Exception exception) {
+ synchronized (lock) {
+ if (!isDone()) {
+ this.exception = exception;
+
+ lock.notifyAll();
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+ <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+ <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-ganglia/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/pom.xml b/flink-metrics/flink-metrics-ganglia/pom.xml
new file mode 100644
index 0000000..c4f51da
--- /dev/null
+++ b/flink-metrics/flink-metrics-ganglia/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics-ganglia</artifactId>
+ <name>flink-metrics-ganglia</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-dropwizard</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>info.ganglia.gmetric4j</groupId>
+ <artifactId>gmetric4j</artifactId>
+ <version>1.0.7</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-ganglia</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
new file mode 100644
index 0000000..adf9394
--- /dev/null
+++ b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.ganglia;
+
+import com.codahale.metrics.ScheduledReporter;
+
+import info.ganglia.gmetric4j.gmetric.GMetric;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+@PublicEvolving
+public class GangliaReporter extends ScheduledDropwizardReporter {
+
+ public static final String ARG_DMAX = "dmax";
+ public static final String ARG_TMAX = "tmax";
+ public static final String ARG_TTL = "ttl";
+ public static final String ARG_MODE_ADDRESSING = "addressingMode";
+
+ @Override
+ public ScheduledReporter getReporter(Configuration config) {
+
+ try {
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+ String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST");
+ int ttl = config.getInteger(ARG_TTL, -1);
+ GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl);
+
+ String prefix = config.getString(ARG_PREFIX, null);
+ String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
+ String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
+ int dMax = config.getInteger(ARG_DMAX, 0);
+ int tMax = config.getInteger(ARG_TMAX, 60);
+
+ com.codahale.metrics.ganglia.GangliaReporter.Builder builder =
+ com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry);
+
+ if (prefix != null) {
+ builder.prefixedWith(prefix);
+ }
+ if (conversionRate != null) {
+ builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+ }
+ if (conversionDuration != null) {
+ builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+ }
+ builder.withDMax(dMax);
+ builder.withTMax(tMax);
+
+ return builder.build(gMetric);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while instantiating GangliaReporter.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-graphite/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/pom.xml b/flink-metrics/flink-metrics-graphite/pom.xml
new file mode 100644
index 0000000..45fb018
--- /dev/null
+++ b/flink-metrics/flink-metrics-graphite/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics-graphite</artifactId>
+ <name>flink-metrics-graphite</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-dropwizard</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
new file mode 100644
index 0000000..16be830
--- /dev/null
+++ b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.graphite;
+
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.graphite.Graphite;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+
+import java.util.concurrent.TimeUnit;
+
+@PublicEvolving
+public class GraphiteReporter extends ScheduledDropwizardReporter {
+
+ @Override
+ public ScheduledReporter getReporter(Configuration config) {
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+
+ String prefix = config.getString(ARG_PREFIX, null);
+ String conversionRate = config.getString(ARG_CONVERSION_RATE, null);
+ String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null);
+
+ com.codahale.metrics.graphite.GraphiteReporter.Builder builder =
+ com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry);
+
+ if (prefix != null) {
+ builder.prefixedWith(prefix);
+ }
+
+ if (conversionRate != null) {
+ builder.convertRatesTo(TimeUnit.valueOf(conversionRate));
+ }
+
+ if (conversionDuration != null) {
+ builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration));
+ }
+
+ return builder.build(new Graphite(host, port));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/pom.xml b/flink-metrics/flink-metrics-statsd/pom.xml
new file mode 100644
index 0000000..8ee0b56
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics-statsd</artifactId>
+ <name>flink-metrics-statsd</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
new file mode 100644
index 0000000..3d9bf07
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.statsd;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.util.ConcurrentModificationException;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Largely based on the StatsDReporter class by ReadyTalk
+ * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
+ *
+ * Ported since it was not present in maven central.
+ */
+@PublicEvolving
+public class StatsDReporter extends AbstractReporter implements Scheduled {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class);
+
+ public static final String ARG_HOST = "host";
+ public static final String ARG_PORT = "port";
+// public static final String ARG_CONVERSION_RATE = "rateConversion";
+// public static final String ARG_CONVERSION_DURATION = "durationConversion";
+
+ private boolean closed = false;
+
+ private DatagramSocket socket;
+ private InetSocketAddress address;
+
+ @Override
+ public void open(Configuration config) {
+ String host = config.getString(ARG_HOST, null);
+ int port = config.getInteger(ARG_PORT, -1);
+
+ if (host == null || host.length() == 0 || port < 1) {
+ throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
+ }
+
+ this.address = new InetSocketAddress(host, port);
+
+// String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS");
+// String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
+// this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
+// this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1);
+
+ try {
+ this.socket = new DatagramSocket(0);
+ } catch (SocketException e) {
+ throw new RuntimeException("Could not create datagram socket. ", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ if (socket != null && !socket.isClosed()) {
+ socket.close();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void report() {
+ // instead of locking here, we tolerate exceptions
+ // we do this to prevent holding the lock for very long and blocking
+ // operator creation and shutdown
+ try {
+ for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
+ if (closed) {
+ return;
+ }
+ reportGauge(entry.getValue(), entry.getKey());
+ }
+
+ for (Map.Entry<Counter, String> entry : counters.entrySet()) {
+ if (closed) {
+ return;
+ }
+ reportCounter(entry.getValue(), entry.getKey());
+ }
+
+ for (Map.Entry<Histogram, String> entry : histograms.entrySet()) {
+ reportHistogram(entry.getValue(), entry.getKey());
+ }
+ }
+ catch (ConcurrentModificationException | NoSuchElementException e) {
+ // ignore - may happen when metrics are concurrently added or removed
+ // report next time
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private void reportCounter(final String name, final Counter counter) {
+ send(name, String.valueOf(counter.getCount()));
+ }
+
+ private void reportGauge(final String name, final Gauge<?> gauge) {
+ Object value = gauge.getValue();
+ if (value != null) {
+ send(name, value.toString());
+ }
+ }
+
+ private void reportHistogram(final String name, final Histogram histogram) {
+ if (histogram != null) {
+
+ HistogramStatistics statistics = histogram.getStatistics();
+
+ if (statistics != null) {
+ send(prefix(name, "count"), String.valueOf(histogram.getCount()));
+ send(prefix(name, "max"), String.valueOf(statistics.getMax()));
+ send(prefix(name, "min"), String.valueOf(statistics.getMin()));
+ send(prefix(name, "mean"), String.valueOf(statistics.getMean()));
+ send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev()));
+ send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5)));
+ send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75)));
+ send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95)));
+ send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98)));
+ send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99)));
+ send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999)));
+ }
+ }
+ }
+
+ private String prefix(String ... names) {
+ if (names.length > 0) {
+ StringBuilder stringBuilder = new StringBuilder(names[0]);
+
+ for (int i = 1; i < names.length; i++) {
+ stringBuilder.append('.').append(names[i]);
+ }
+
+ return stringBuilder.toString();
+ } else {
+ return "";
+ }
+ }
+
+ private void send(final String name, final String value) {
+ try {
+ String formatted = String.format("%s:%s|g", name, value);
+ byte[] data = formatted.getBytes();
+ socket.send(new DatagramPacket(data, data.length, this.address));
+ }
+ catch (IOException e) {
+ LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
new file mode 100644
index 0000000..5f5ef40
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.statsd;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+
+public class StatsDReporterTest extends TestLogger {
+
+ /**
+ * Tests that histograms are properly reported via the StatsD reporter
+ */
+ @Test
+ public void testStatsDHistogramReporting() throws Exception {
+ MetricRegistry registry = null;
+ DatagramSocketReceiver receiver = null;
+ Thread receiverThread = null;
+ long timeout = 5000;
+ long joinTimeout = 30000;
+
+ String histogramName = "histogram";
+
+ try {
+ receiver = new DatagramSocketReceiver();
+
+ receiverThread = new Thread(receiver);
+
+ receiverThread.start();
+
+ int port = receiver.getPort();
+
+ Configuration config = new Configuration();
+ config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, StatsDReporter.class.getName());
+ config.setString(MetricRegistry.KEY_METRICS_REPORTER_INTERVAL, "1 SECONDS");
+ config.setString(MetricRegistry.KEY_METRICS_REPORTER_ARGUMENTS, "--host localhost --port " + port);
+
+ registry = new MetricRegistry(config);
+
+ TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+ TestingHistogram histogram = new TestingHistogram();
+
+ metricGroup.histogram(histogramName, histogram);
+
+ receiver.waitUntilNumLines(11, timeout);
+
+ Set<String> lines = receiver.getLines();
+
+ String prefix = metricGroup.getScopeString() + "." + histogramName;
+
+ Set<String> expectedLines = new HashSet<>();
+
+ expectedLines.add(prefix + ".count:1|g");
+ expectedLines.add(prefix + ".mean:3.0|g");
+ expectedLines.add(prefix + ".min:6|g");
+ expectedLines.add(prefix + ".max:5|g");
+ expectedLines.add(prefix + ".stddev:4.0|g");
+ expectedLines.add(prefix + ".p75:0.75|g");
+ expectedLines.add(prefix + ".p98:0.98|g");
+ expectedLines.add(prefix + ".p99:0.99|g");
+ expectedLines.add(prefix + ".p999:0.999|g");
+ expectedLines.add(prefix + ".p95:0.95|g");
+ expectedLines.add(prefix + ".p50:0.5|g");
+
+ assertEquals(expectedLines, lines);
+
+ } finally {
+ if (registry != null) {
+ registry.shutdown();
+ }
+
+ if (receiver != null) {
+ receiver.stop();
+ }
+
+ if (receiverThread != null) {
+ receiverThread.join(joinTimeout);
+ }
+ }
+ }
+
+ public static class TestingHistogram implements Histogram {
+
+ @Override
+ public void update(long value) {
+
+ }
+
+ @Override
+ public long getCount() {
+ return 1;
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ return new HistogramStatistics() {
+ @Override
+ public double getQuantile(double quantile) {
+ return quantile;
+ }
+
+ @Override
+ public long[] getValues() {
+ return new long[0];
+ }
+
+ @Override
+ public int size() {
+ return 2;
+ }
+
+ @Override
+ public double getMean() {
+ return 3;
+ }
+
+ @Override
+ public double getStdDev() {
+ return 4;
+ }
+
+ @Override
+ public long getMax() {
+ return 5;
+ }
+
+ @Override
+ public long getMin() {
+ return 6;
+ }
+ };
+ }
+ }
+
+ public static class DatagramSocketReceiver implements Runnable {
+ private static final Object obj = new Object();
+
+ private final DatagramSocket socket;
+ private final ConcurrentHashMap<String, Object> lines;
+
+ private boolean running = true;
+
+ public DatagramSocketReceiver() throws SocketException {
+ socket = new DatagramSocket();
+ lines = new ConcurrentHashMap<>();
+ }
+
+ public int getPort() {
+ return socket.getLocalPort();
+ }
+
+ public void stop() {
+ running = false;
+ socket.close();
+ }
+
+ public void waitUntilNumLines(int numberLines, long timeout) throws TimeoutException {
+ long endTimeout = System.currentTimeMillis() + timeout;
+ long remainingTimeout = timeout;
+
+ while (numberLines > lines.size() && remainingTimeout > 0) {
+ synchronized (lines) {
+ try {
+ lines.wait(remainingTimeout);
+ } catch (InterruptedException e) {
+ // ignore interruption exceptions
+ }
+ }
+
+ remainingTimeout = endTimeout - System.currentTimeMillis();
+ }
+
+ if (remainingTimeout <= 0) {
+ throw new TimeoutException("Have not received " + numberLines + " in time.");
+ }
+ }
+
+ public Set<String> getLines() {
+ return lines.keySet();
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ byte[] buffer = new byte[1024];
+
+ DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+ socket.receive(packet);
+
+ String line = new String(packet.getData(), 0, packet.getLength());
+
+ lines.put(line, obj);
+
+ synchronized (lines) {
+ lines.notifyAll();
+ }
+ } catch (IOException ex) {
+ // ignore the exceptions
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml b/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1c4ea08
--- /dev/null
+++ b/flink-metrics/flink-metrics-statsd/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+ <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+ <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
new file mode 100644
index 0000000..542f49c
--- /dev/null
+++ b/flink-metrics/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics</artifactId>
+ <name>flink-metrics</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>flink-metrics-dropwizard</module>
+ <module>flink-metrics-ganglia</module>
+ <module>flink-metrics-graphite</module>
+ <module>flink-metrics-statsd</module>
+ </modules>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c7a88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9dc8846..9da3fa5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,7 +74,7 @@ under the License.
<module>flink-quickstart</module>
<module>flink-contrib</module>
<module>flink-dist</module>
- <module>flink-metric-reporters</module>
+ <module>flink-metrics</module>
</modules>
<properties>