You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/20 09:33:51 UTC
[flink] branch release-1.11 updated:
[FLINK-16611][metrics][datadog] Send report in chunks
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new eead39d [FLINK-16611][metrics][datadog] Send report in chunks
eead39d is described below
commit eead39dda060c9f91c145aadc57cc8177fa23242
Author: Steve Whelan <sw...@gmail.com>
AuthorDate: Wed May 20 05:31:45 2020 -0400
[FLINK-16611][metrics][datadog] Send report in chunks
---
docs/monitoring/metrics.md | 2 ++
docs/monitoring/metrics.zh.md | 2 ++
.../org/apache/flink/metrics/datadog/DCounter.java | 1 +
.../org/apache/flink/metrics/datadog/DMetric.java | 3 +++
.../org/apache/flink/metrics/datadog/DSeries.java | 4 ++++
.../flink/metrics/datadog/DatadogHttpReporter.java | 28 +++++++++++++++-------
6 files changed, 31 insertions(+), 9 deletions(-)
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 67cf385..8e53273 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -770,6 +770,7 @@ Parameters:
- `proxyHost` - (optional) The proxy host to use when sending to Datadog.
- `proxyPort` - (optional) The proxy port to use when sending to Datadog, defaults to 8080.
- `dataCenter` - (optional) The data center (`EU`/`US`) to connect to, defaults to `US`.
+- `maxMetricsPerRequest` - (optional) The maximum number of metrics to include in each request, defaults to 2000.
Example configuration:
@@ -781,6 +782,7 @@ metrics.reporter.dghttp.tags: myflinkapp,prod
metrics.reporter.dghttp.proxyHost: my.web.proxy.com
metrics.reporter.dghttp.proxyPort: 8080
metrics.reporter.dhhttp.dataCenter: US
+metrics.reporter.dhhttp.maxMetricsPerRequest: 2000
{% endhighlight %}
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 8a59edf..da249d8 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -770,6 +770,7 @@ Parameters:
- `proxyHost` - (optional) The proxy host to use when sending to Datadog.
- `proxyPort` - (optional) The proxy port to use when sending to Datadog, defaults to 8080.
- `dataCenter` - (optional) The data center (`EU`/`US`) to connect to, defaults to `US`.
+- `maxMetricsPerRequest` - (optional) The maximum number of metrics to include in each request, defaults to 2000.
Example configuration:
@@ -781,6 +782,7 @@ metrics.reporter.dghttp.tags: myflinkapp,prod
metrics.reporter.dghttp.proxyHost: my.web.proxy.com
metrics.reporter.dghttp.proxyPort: 8080
metrics.reporter.dhhttp.dataCenter: US
+metrics.reporter.dhhttp.maxMetricsPerRequest: 2000
{% endhighlight %}
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
index 549787c..1427923 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
@@ -52,6 +52,7 @@ public class DCounter extends DMetric {
return difference;
}
+ @Override
public void ackReport() {
lastReportCount = currentReportCount;
}
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
index 75a4525..1b87435 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
@@ -78,4 +78,7 @@ public abstract class DMetric {
@JsonIgnore
public abstract Number getMetricValue();
+
+ public void ackReport() {
+ }
}
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
index 139d189..fb631f2 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
@@ -35,6 +35,10 @@ public class DSeries {
series = new ArrayList<>();
}
+ public DSeries(List<DMetric> series) {
+ this.series = series;
+ }
+
public void addGauge(DGauge gauge) {
series.add(gauge);
}
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
index d32323b..7e569eb 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
@@ -56,6 +56,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
private DatadogHttpClient client;
private List<String> configTags;
+ private int maxMetricsPerRequestValue;
private final Clock clock = () -> System.currentTimeMillis() / 1000L;
@@ -64,6 +65,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
public static final String PROXY_PORT = "proxyPort";
public static final String DATA_CENTER = "dataCenter";
public static final String TAGS = "tags";
+ public static final String MAX_METRICS_PER_REQUEST = "maxMetricsPerRequest";
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
@@ -113,6 +115,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
String proxyHost = config.getString(PROXY_HOST, null);
Integer proxyPort = config.getInteger(PROXY_PORT, 8080);
String rawDataCenter = config.getString(DATA_CENTER, "US");
+ maxMetricsPerRequestValue = config.getInteger(MAX_METRICS_PER_REQUEST, 2000);
DataCenter dataCenter = DataCenter.valueOf(rawDataCenter);
String tags = config.getString(TAGS, "");
@@ -120,7 +123,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
configTags = getTagsFromConfig(tags);
- LOGGER.info("Configured DatadogHttpReporter with {tags={}, proxyHost={}, proxyPort={}, dataCenter={}", tags, proxyHost, proxyPort, dataCenter);
+ LOGGER.info("Configured DatadogHttpReporter with {tags={}, proxyHost={}, proxyPort={}, dataCenter={}, maxMetricsPerRequest={}", tags, proxyHost, proxyPort, dataCenter, maxMetricsPerRequestValue);
}
@Override
@@ -137,14 +140,21 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
counters.values().forEach(request::addCounter);
meters.values().forEach(request::addMeter);
- try {
- client.send(request);
- counters.values().forEach(DCounter::ackReport);
- LOGGER.debug("Reported series with size {}.", request.getSeries().size());
- } catch (SocketTimeoutException e) {
- LOGGER.warn("Failed reporting metrics to Datadog because of socket timeout: {}", e.getMessage());
- } catch (Exception e) {
- LOGGER.warn("Failed reporting metrics to Datadog.", e);
+ int totalMetrics = request.getSeries().size();
+ int fromIndex = 0;
+ while (fromIndex < totalMetrics) {
+ int toIndex = Math.min(fromIndex + maxMetricsPerRequestValue, totalMetrics);
+ try {
+ DSeries chunk = new DSeries(request.getSeries().subList(fromIndex, toIndex));
+ client.send(chunk);
+ chunk.getSeries().forEach(DMetric::ackReport);
+ LOGGER.debug("Reported series with size {}.", chunk.getSeries().size());
+ } catch (SocketTimeoutException e) {
+ LOGGER.warn("Failed reporting metrics to Datadog because of socket timeout: {}", e.getMessage());
+ } catch (Exception e) {
+ LOGGER.warn("Failed reporting metrics to Datadog.", e);
+ }
+ fromIndex = toIndex;
}
}