You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/20 09:33:51 UTC

[flink] branch release-1.11 updated: [FLINK-16611][metrics][datadog] Send report in chunks

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new eead39d  [FLINK-16611][metrics][datadog] Send report in chunks
eead39d is described below

commit eead39dda060c9f91c145aadc57cc8177fa23242
Author: Steve Whelan <sw...@gmail.com>
AuthorDate: Wed May 20 05:31:45 2020 -0400

    [FLINK-16611][metrics][datadog] Send report in chunks
---
 docs/monitoring/metrics.md                         |  2 ++
 docs/monitoring/metrics.zh.md                      |  2 ++
 .../org/apache/flink/metrics/datadog/DCounter.java |  1 +
 .../org/apache/flink/metrics/datadog/DMetric.java  |  3 +++
 .../org/apache/flink/metrics/datadog/DSeries.java  |  4 ++++
 .../flink/metrics/datadog/DatadogHttpReporter.java | 28 +++++++++++++++-------
 6 files changed, 31 insertions(+), 9 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 67cf385..8e53273 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -770,6 +770,7 @@ Parameters:
 - `proxyHost` - (optional) The proxy host to use when sending to Datadog.
 - `proxyPort` - (optional) The proxy port to use when sending to Datadog, defaults to 8080.
 - `dataCenter` - (optional) The data center (`EU`/`US`) to connect to, defaults to `US`.
+- `maxMetricsPerRequest` - (optional) The maximum number of metrics to include in each request, defaults to 2000.
 
 Example configuration:
 
@@ -781,6 +782,7 @@ metrics.reporter.dghttp.tags: myflinkapp,prod
 metrics.reporter.dghttp.proxyHost: my.web.proxy.com
 metrics.reporter.dghttp.proxyPort: 8080
 metrics.reporter.dhhttp.dataCenter: US
+metrics.reporter.dhhttp.maxMetricsPerRequest: 2000
 
 {% endhighlight %}
 
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 8a59edf..da249d8 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -770,6 +770,7 @@ Parameters:
 - `proxyHost` - (optional) The proxy host to use when sending to Datadog.
 - `proxyPort` - (optional) The proxy port to use when sending to Datadog, defaults to 8080.
 - `dataCenter` - (optional) The data center (`EU`/`US`) to connect to, defaults to `US`.
+- `maxMetricsPerRequest` - (optional) The maximum number of metrics to include in each request, defaults to 2000.
 
 Example configuration:
 
@@ -781,6 +782,7 @@ metrics.reporter.dghttp.tags: myflinkapp,prod
 metrics.reporter.dghttp.proxyHost: my.web.proxy.com
 metrics.reporter.dghttp.proxyPort: 8080
 metrics.reporter.dhhttp.dataCenter: US
+metrics.reporter.dhhttp.maxMetricsPerRequest: 2000
 
 {% endhighlight %}
 
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
index 549787c..1427923 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
@@ -52,6 +52,7 @@ public class DCounter extends DMetric {
 		return difference;
 	}
 
+	@Override
 	public void ackReport() {
 		lastReportCount = currentReportCount;
 	}
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
index 75a4525..1b87435 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
@@ -78,4 +78,7 @@ public abstract class DMetric {
 
 	@JsonIgnore
 	public abstract Number getMetricValue();
+
+	public void ackReport() {
+	}
 }
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
index 139d189..fb631f2 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
@@ -35,6 +35,10 @@ public class DSeries {
 		series = new ArrayList<>();
 	}
 
+	public DSeries(List<DMetric> series) {
+		this.series = series;
+	}
+
 	public void addGauge(DGauge gauge) {
 		series.add(gauge);
 	}
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
index d32323b..7e569eb 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
@@ -56,6 +56,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
 
 	private DatadogHttpClient client;
 	private List<String> configTags;
+	private int maxMetricsPerRequestValue;
 
 	private final Clock clock = () -> System.currentTimeMillis() / 1000L;
 
@@ -64,6 +65,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
 	public static final String PROXY_PORT = "proxyPort";
 	public static final String DATA_CENTER = "dataCenter";
 	public static final String TAGS = "tags";
+	public static final String MAX_METRICS_PER_REQUEST = "maxMetricsPerRequest";
 
 	@Override
 	public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
@@ -113,6 +115,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
 		String proxyHost = config.getString(PROXY_HOST, null);
 		Integer proxyPort = config.getInteger(PROXY_PORT, 8080);
 		String rawDataCenter = config.getString(DATA_CENTER, "US");
+		maxMetricsPerRequestValue = config.getInteger(MAX_METRICS_PER_REQUEST, 2000);
 		DataCenter dataCenter = DataCenter.valueOf(rawDataCenter);
 		String tags = config.getString(TAGS, "");
 
@@ -120,7 +123,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
 
 		configTags = getTagsFromConfig(tags);
 
-		LOGGER.info("Configured DatadogHttpReporter with {tags={}, proxyHost={}, proxyPort={}, dataCenter={}", tags, proxyHost, proxyPort, dataCenter);
+		LOGGER.info("Configured DatadogHttpReporter with {tags={}, proxyHost={}, proxyPort={}, dataCenter={}, maxMetricsPerRequest={}", tags, proxyHost, proxyPort, dataCenter, maxMetricsPerRequestValue);
 	}
 
 	@Override
@@ -137,14 +140,21 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
 		counters.values().forEach(request::addCounter);
 		meters.values().forEach(request::addMeter);
 
-		try {
-			client.send(request);
-			counters.values().forEach(DCounter::ackReport);
-			LOGGER.debug("Reported series with size {}.", request.getSeries().size());
-		} catch (SocketTimeoutException e) {
-			LOGGER.warn("Failed reporting metrics to Datadog because of socket timeout: {}", e.getMessage());
-		} catch (Exception e) {
-			LOGGER.warn("Failed reporting metrics to Datadog.", e);
+		int totalMetrics = request.getSeries().size();
+		int fromIndex = 0;
+		while (fromIndex < totalMetrics) {
+			int toIndex = Math.min(fromIndex + maxMetricsPerRequestValue, totalMetrics);
+			try {
+				DSeries chunk = new DSeries(request.getSeries().subList(fromIndex, toIndex));
+				client.send(chunk);
+				chunk.getSeries().forEach(DMetric::ackReport);
+				LOGGER.debug("Reported series with size {}.", chunk.getSeries().size());
+			} catch (SocketTimeoutException e) {
+				LOGGER.warn("Failed reporting metrics to Datadog because of socket timeout: {}", e.getMessage());
+			} catch (Exception e) {
+				LOGGER.warn("Failed reporting metrics to Datadog.", e);
+			}
+			fromIndex = toIndex;
 		}
 	}