You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/10 07:06:35 UTC
[7/7] flink git commit: [FLINK-6013][metrics] Add Datadog HTTP
metrics reporter
[FLINK-6013][metrics] Add Datadog HTTP metrics reporter
This closes #3736.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54ceec16
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54ceec16
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54ceec16
Branch: refs/heads/master
Commit: 54ceec16c11655da4181c0816a3b12d1c4bab465
Parents: 50baec6
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Apr 18 10:27:17 2017 -0700
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 22:56:49 2017 +0200
----------------------------------------------------------------------
docs/monitoring/metrics.md | 24 +++
flink-dist/pom.xml | 7 +
flink-dist/src/main/assemblies/opt.xml | 7 +
flink-metrics/flink-metrics-datadog/pom.xml | 108 ++++++++++
.../apache/flink/metrics/datadog/DCounter.java | 44 ++++
.../apache/flink/metrics/datadog/DGauge.java | 45 ++++
.../apache/flink/metrics/datadog/DMeter.java | 42 ++++
.../apache/flink/metrics/datadog/DMetric.java | 84 ++++++++
.../apache/flink/metrics/datadog/DSeries.java | 45 ++++
.../metrics/datadog/DatadogHttpClient.java | 97 +++++++++
.../metrics/datadog/DatadogHttpReporter.java | 210 +++++++++++++++++++
.../flink/metrics/datadog/MetricType.java | 30 +++
.../metrics/datadog/DatadogHttpClientTest.java | 199 ++++++++++++++++++
.../src/test/resources/log4j-test.properties | 27 +++
flink-metrics/pom.xml | 1 +
15 files changed, 970 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 290a452..2bc65a6 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -436,6 +436,30 @@ metrics.reporter.stsd.port: 8125
{% endhighlight %}
+### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
+
+In order to use this reporter you must copy `/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Note any variables in Flink metrics, such as `<host>`, `<job_name>`, `<tm_id>`, `<subtask_index>`, `<task_name>`, and `<operator_name>`,
+will be sent to Datadog as tags. Tags will look like `host:localhost` and `job_name:myjobname`.
+
+Parameters:
+
+- `apikey` - the Datadog API key
+- `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: dghttp
+metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
+metrics.reporter.dghttp.apikey: xxx
+metrics.reporter.dghttp.tags: myflinkapp,prod
+
+{% endhighlight %}
+
## System metrics
By default Flink gathers several metrics that provide deep insights on the current state.
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 9773991..6d8debf 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -202,6 +202,13 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-datadog</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- end optional Flink metrics reporters -->
<!-- start optional Flink libraries -->
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 95218d7..0386b92 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -105,6 +105,13 @@
</file>
<file>
+ <source>../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}-shaded.jar</source>
+ <outputDirectory>opt/</outputDirectory>
+ <destName>flink-metrics-datadog-${project.version}.jar</destName>
+ <fileMode>0644</fileMode>
+ </file>
+
+ <file>
<source>../flink-shaded-hadoop/flink-shaded-hadoop2/target/flink-shaded-hadoop2-${project.version}.jar</source>
<outputDirectory>opt/</outputDirectory>
<destName>flink-shaded-hadoop2-${project.version}.jar</destName>
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/pom.xml b/flink-metrics/flink-metrics-datadog/pom.xml
new file mode 100644
index 0000000..0d473fc
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics</artifactId>
+ <version>1.4-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics-datadog</artifactId>
+ <name>flink-metrics-datadog</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>3.7.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okio</groupId>
+ <artifactId>okio</artifactId>
+ <version>1.12.0</version>
+ </dependency>
+
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <relocations combine.children="append">
+ <relocation>
+ <pattern>okhttp3</pattern>
+ <shadedPattern>org.apache.flink.shaded.okhttp3</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>okio</pattern>
+ <shadedPattern>org.apache.flink.shaded.okio</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
new file mode 100644
index 0000000..58abbd6
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+
+import java.util.List;
+
+/**
+ * Mapping of counter between Flink and Datadog
+ * */
+public class DCounter extends DMetric {
+ private final Counter counter;
+
+ public DCounter(Counter c, String metricName, String host, List<String> tags) {
+ super(MetricType.counter, metricName, host, tags);
+ counter = c;
+ }
+
+ /**
+ * Visibility of this method must not be changed
+ * since we deliberately not map it to json object in a Datadog-defined format
+ * */
+ @Override
+ public Number getMetricValue() {
+ return counter.getCount();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
new file mode 100644
index 0000000..8deb117
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+
+import org.apache.flink.metrics.Gauge;
+
+import java.util.List;
+
+/**
+ * Mapping of gauge between Flink and Datadog
+ * */
+public class DGauge extends DMetric {
+ private final Gauge<Number> gauge;
+
+ public DGauge(Gauge<Number> g, String metricName, String host, List<String> tags) {
+ super(MetricType.gauge, metricName, host, tags);
+ gauge = g;
+ }
+
+ /**
+ * Visibility of this method must not be changed
+ * since we deliberately not map it to json object in a Datadog-defined format
+ * */
+ @Override
+ public Number getMetricValue() {
+ return gauge.getValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
new file mode 100644
index 0000000..181a00c
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Meter;
+
+import java.util.List;
+
+/**
+ * Mapping of meter between Flink and Datadog
+ *
+ * Only consider rate of the meter, due to Datadog HTTP API's limited support of meter
+ * */
+public class DMeter extends DMetric {
+ private final Meter meter;
+
+ public DMeter(Meter m, String metricName, String host, List<String> tags) {
+ super(MetricType.gauge, metricName, host, tags);
+ meter = m;
+ }
+
+ @Override
+ public Number getMetricValue() {
+ return meter.getRate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
new file mode 100644
index 0000000..3f9d6ff
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract metric of Datadog for serialization
+ * */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public abstract class DMetric {
+ private static final long MILLIS_TO_SEC = 1000L;
+
+ /**
+ * Names of metric/type/tags field and their getters must not be changed
+ * since they are mapped to json objects in a Datadog-defined format
+ * */
+ private final String metric; // Metric name
+ private final MetricType type;
+ private final String host;
+ private final List<String> tags;
+
+ public DMetric(MetricType metricType, String metric, String host, List<String> tags) {
+ this.type = metricType;
+ this.metric = metric;
+ this.host = host;
+ this.tags = tags;
+ }
+
+ public MetricType getType() {
+ return type;
+ }
+
+ public String getMetric() {
+ return metric;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public List<String> getTags() {
+ return tags;
+ }
+
+ public List<List<Number>> getPoints() {
+ // One single data point
+ List<Number> point = new ArrayList<>();
+ point.add(getUnixEpochTimestamp());
+ point.add(getMetricValue());
+
+ List<List<Number>> points = new ArrayList<>();
+ points.add(point);
+
+ return points;
+ }
+
+ @JsonIgnore
+ public abstract Number getMetricValue();
+
+ public static long getUnixEpochTimestamp() {
+ return (System.currentTimeMillis() / MILLIS_TO_SEC);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
new file mode 100644
index 0000000..fb0bb09
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Json serialization between Flink and Datadog
+ **/
+public class DSeries {
+ /**
+ * Names of series field and its getters must not be changed
+ * since they are mapped to json objects in a Datadog-defined format
+ * */
+ private List<DMetric> series;
+
+ public DSeries() {
+ series = new ArrayList<>();
+ }
+
+ public void addMetric(DMetric metric) {
+ series.add(metric);
+ }
+
+ public List<DMetric> getSeries() {
+ return series;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
new file mode 100644
index 0000000..dfbcee1
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Http client talking to Datadog
+ * */
+public class DatadogHttpClient{
+ private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s";
+ private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s";
+ private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8");
+ private static final int TIMEOUT = 3;
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private final String seriesUrl;
+ private final String validateUrl;
+ private final OkHttpClient client;
+ private final String apiKey;
+
+ public DatadogHttpClient(String dgApiKey) {
+ if (dgApiKey == null || dgApiKey.isEmpty()) {
+ throw new IllegalArgumentException("Invalid API key:" + dgApiKey);
+ }
+
+ apiKey = dgApiKey;
+ client = new OkHttpClient.Builder()
+ .connectTimeout(TIMEOUT, TimeUnit.SECONDS)
+ .writeTimeout(TIMEOUT, TimeUnit.SECONDS)
+ .readTimeout(TIMEOUT, TimeUnit.SECONDS)
+ .build();
+
+ seriesUrl = String.format(SERIES_URL_FORMAT, apiKey);
+ validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey);
+ validateApiKey();
+ }
+
+ private void validateApiKey() {
+ Request r = new Request.Builder().url(validateUrl).get().build();
+
+ try {
+ Response response = client.newCall(r).execute();
+ if (!response.isSuccessful()) {
+ throw new IllegalArgumentException(
+ String.format("API key: %s is invalid", apiKey));
+ }
+ } catch(IOException e) {
+ throw new IllegalStateException("Failed contacting Datadog to validate API key", e);
+ }
+ }
+
+ public void send(DatadogHttpReporter.DatadogHttpRequest request) throws Exception {
+ String postBody = serialize(request.getSeries());
+
+ Request r = new Request.Builder()
+ .url(seriesUrl)
+ .post(RequestBody.create(MEDIA_TYPE, postBody))
+ .build();
+
+ client.newCall(r).execute().close();
+ }
+
+ public static String serialize(Object obj) throws JsonProcessingException {
+ return MAPPER.writeValueAsString(obj);
+ }
+
+ public void close() {
+ client.dispatcher().executorService().shutdown();
+ client.connectionPool().evictAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
new file mode 100644
index 0000000..fcb5c4b
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, Scheduled {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class);
+ private static final String HOST_VARIABLE = "<host>";
+
+ // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+ private final Map<Gauge, DGauge> gauges = new ConcurrentHashMap<>();
+ private final Map<Counter, DCounter> counters = new ConcurrentHashMap<>();
+ private final Map<Meter, DMeter> meters = new ConcurrentHashMap<>();
+
+ private DatadogHttpClient client;
+ private List<String> configTags;
+
+ public static final String API_KEY = "apikey";
+ public static final String TAGS = "tags";
+
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+ final String name = group.getMetricIdentifier(metricName);
+
+ List<String> tags = new ArrayList<>(configTags);
+ tags.addAll(getTagsFromMetricGroup(group));
+ String host = getHostFromMetricGroup(group);
+
+ if (metric instanceof Counter) {
+ Counter c = (Counter) metric;
+ counters.put(c, new DCounter(c, name, host, tags));
+ } else if (metric instanceof Gauge) {
+ Gauge g = (Gauge) metric;
+ gauges.put(g, new DGauge(g, name, host, tags));
+ } else if (metric instanceof Meter) {
+ Meter m = (Meter) metric;
+ // Only consider rate
+ meters.put(m, new DMeter(m, name, host, tags));
+ } else if (metric instanceof Histogram) {
+ LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName);
+ } else {
+ LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
+ "does not support this metric type.", metric.getClass().getName());
+ }
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+ if (metric instanceof Counter) {
+ counters.remove(metric);
+ } else if (metric instanceof Gauge) {
+ gauges.remove(metric);
+ } else if (metric instanceof Meter) {
+ meters.remove(metric);
+ } else if (metric instanceof Histogram) {
+ // No Histogram is registered
+ } else {
+ LOGGER.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
+ "does not support this metric type.", metric.getClass().getName());
+ }
+ }
+
+ @Override
+ public void open(MetricConfig config) {
+ client = new DatadogHttpClient(config.getString(API_KEY, null));
+ LOGGER.info("Configured DatadogHttpReporter");
+
+ configTags = getTagsFromConfig(config.getString(TAGS, ""));
+ }
+
+ @Override
+ public void close() {
+ client.close();
+ LOGGER.info("Shut down DatadogHttpReporter");
+ }
+
+ @Override
+ public void report() {
+ DatadogHttpRequest request = new DatadogHttpRequest();
+
+ for (Map.Entry<Gauge, DGauge> entry : gauges.entrySet()) {
+ DGauge g = entry.getValue();
+ try {
+ // Will throw exception if the Gauge is not of Number type
+ // Flink uses Gauge to store many types other than Number
+ g.getMetricValue();
+ request.addGauge(g);
+ } catch (Exception e) {
+ // Remove that Gauge if it's not of Number type
+ gauges.remove(entry.getKey());
+ }
+ }
+
+ for (DCounter c : counters.values()) {
+ request.addCounter(c);
+ }
+
+ for (DMeter m : meters.values()) {
+ request.addMeter(m);
+ }
+
+ try {
+ client.send(request);
+ } catch (Exception e) {
+ LOGGER.warn("Failed reporting metrics to Datadog.", e);
+ }
+ }
+
+ /**
+ * Get config tags from config 'metrics.reporter.dghttp.tags'
+ * */
+ private List<String> getTagsFromConfig(String str) {
+ return Arrays.asList(str.split(","));
+ }
+
+ /**
+ * Get tags from MetricGroup#getAllVariables(), excluding 'host'
+ * */
+ private List<String> getTagsFromMetricGroup(MetricGroup metricGroup) {
+ List<String> tags = new ArrayList<>();
+
+ for (Map.Entry<String, String> entry: metricGroup.getAllVariables().entrySet()) {
+ if(!entry.getKey().equals(HOST_VARIABLE)) {
+ tags.add(getVariableName(entry.getKey()) + ":" + entry.getValue());
+ }
+ }
+
+ return tags;
+ }
+
+ /**
+ * Get host from MetricGroup#getAllVariables() if it exists; returns Null otherwise
+ * */
+ private String getHostFromMetricGroup(MetricGroup metricGroup) {
+ return metricGroup.getAllVariables().get(HOST_VARIABLE);
+ }
+
+ /**
+ * Given "<xxx>", return "xxx"
+ * */
+ private String getVariableName(String str) {
+ return str.substring(1, str.length() - 1);
+ }
+
+ /**
+ * Compact metrics in batch, serialize them, and send to Datadog via HTTP
+ * */
+ static class DatadogHttpRequest {
+ private final DSeries series;
+
+ public DatadogHttpRequest() {
+ series = new DSeries();
+ }
+
+ public void addGauge(DGauge gauge) {
+ series.addMetric(gauge);
+ }
+
+ public void addCounter(DCounter counter) {
+ series.addMetric(counter);
+ }
+
+ public void addMeter(DMeter meter) {
+ series.addMetric(meter);
+ }
+
+ public DSeries getSeries() {
+ return series;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
new file mode 100644
index 0000000..97f9b29
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+/**
+ * Metric types supported by Datadog
+ * */
+public enum MetricType {
+ /**
+ * Names of 'gauge' and 'counter' must not be changed
+ * since they are mapped to json objects in a Datadog-defined format
+ * */
+ gauge, counter
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
new file mode 100644
index 0000000..bda5d47
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Enclosed.class)
+public class DatadogHttpClientTest {
+ public static class TestApiKey {
+ @Test(expected = IllegalArgumentException.class)
+ public void testClientWithEmptyKey() {
+ new DatadogHttpClient("");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testClientWithNullKey() {
+ new DatadogHttpClient(null);
+ }
+ }
+
+ @RunWith(PowerMockRunner.class)
+ @PrepareForTest(DMetric.class)
+ public static class TestSerialization {
+ private static List<String> tags = Arrays.asList("tag1", "tag2");
+
+ private static final long MOCKED_SYSTEM_MILLIS = 123L;
+
+ @Before
+ public void mockSystemMillis() {
+ PowerMockito.mockStatic(DMetric.class);
+ PowerMockito.when(DMetric.getUnixEpochTimestamp()).thenReturn(MOCKED_SYSTEM_MILLIS);
+ }
+
+ @Test
+ public void serializeGauge() throws JsonProcessingException {
+
+ DGauge g = new DGauge(new Gauge<Number>() {
+ @Override
+ public Number getValue() {
+ return 1;
+ }
+ }, "testCounter", "localhost", tags);
+
+ assertEquals(
+ "{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+ DatadogHttpClient.serialize(g));
+ }
+
+ @Test
+ public void serializeGaugeWithoutHost() throws JsonProcessingException {
+
+ DGauge g = new DGauge(new Gauge<Number>() {
+ @Override
+ public Number getValue() {
+ return 1;
+ }
+ }, "testCounter", null, tags);
+
+ assertEquals(
+ "{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+ DatadogHttpClient.serialize(g));
+ }
+
+ @Test
+ public void serializeCounter() throws JsonProcessingException {
+ DCounter c = new DCounter(new Counter() {
+ @Override
+ public void inc() {}
+
+ @Override
+ public void inc(long n) {}
+
+ @Override
+ public void dec() {}
+
+ @Override
+ public void dec(long n) {}
+
+ @Override
+ public long getCount() {
+ return 1;
+ }
+ }, "testCounter", "localhost", tags);
+
+ assertEquals(
+ "{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+ DatadogHttpClient.serialize(c));
+ }
+
+ @Test
+ public void serializeCounterWithoutHost() throws JsonProcessingException {
+ DCounter c = new DCounter(new Counter() {
+ @Override
+ public void inc() {}
+
+ @Override
+ public void inc(long n) {}
+
+ @Override
+ public void dec() {}
+
+ @Override
+ public void dec(long n) {}
+
+ @Override
+ public long getCount() {
+ return 1;
+ }
+ }, "testCounter", null, tags);
+
+ assertEquals(
+ "{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+ DatadogHttpClient.serialize(c));
+ }
+
+ @Test
+ public void serializeMeter() throws JsonProcessingException {
+
+ DMeter m = new DMeter(new Meter() {
+ @Override
+ public void markEvent() {}
+
+ @Override
+ public void markEvent(long n) {}
+
+ @Override
+ public double getRate() {
+ return 1;
+ }
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+ }, "testMeter","localhost", tags);
+
+ assertEquals(
+ "{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+ DatadogHttpClient.serialize(m));
+ }
+
+ @Test
+ public void serializeMeterWithoutHost() throws JsonProcessingException {
+
+ DMeter m = new DMeter(new Meter() {
+ @Override
+ public void markEvent() {}
+
+ @Override
+ public void markEvent(long n) {}
+
+ @Override
+ public double getRate() {
+ return 1;
+ }
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+ }, "testMeter", null, tags);
+
+ assertEquals(
+ "{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+ DatadogHttpClient.serialize(m));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index 317dde8..e1b66c2 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -40,6 +40,7 @@ under the License.
<module>flink-metrics-graphite</module>
<module>flink-metrics-jmx</module>
<module>flink-metrics-statsd</module>
+ <module>flink-metrics-datadog</module>
</modules>
<!-- override these root dependencies as 'provided', so they don't end up