You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/27 11:08:41 UTC
[1/2] flink git commit: [FLINK-4074] Make metric reporters less
blocking
Repository: flink
Updated Branches:
refs/heads/master 9487fcbfb -> 19ff8db68
[FLINK-4074] Make metric reporters less blocking
This closes #2105
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56cdec7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56cdec7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56cdec7d
Branch: refs/heads/master
Commit: 56cdec7d81a53f9ea9578ca274c149f9758b5ffd
Parents: 9487fcb
Author: zentol <ch...@apache.org>
Authored: Wed Jun 15 12:23:41 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 13:08:16 2016 +0200
----------------------------------------------------------------------
.../apache/flink/metrics/MetricRegistry.java | 39 ++++++++++++--------
.../flink/metrics/statsd/StatsDReporter.java | 9 +++++
2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/56cdec7d/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index b3422e1..0abcdec 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -31,6 +31,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -60,7 +62,7 @@ public class MetricRegistry {
static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
private final MetricReporter reporter;
- private final java.util.Timer timer;
+ private final ScheduledExecutorService executor;
private final ScopeFormats scopeFormats;
@@ -86,12 +88,11 @@ public class MetricRegistry {
// by default, create JMX metrics
LOG.info("No metrics reporter configured, exposing metrics via JMX");
this.reporter = new JMXReporter();
- this.timer = null;
+ this.executor = null;
}
else {
MetricReporter reporter;
- java.util.Timer timer;
-
+ ScheduledExecutorService executor = null;
try {
String configuredPeriod = config.getString(KEY_METRICS_REPORTER_INTERVAL, null);
TimeUnit timeunit = TimeUnit.SECONDS;
@@ -117,24 +118,20 @@ public class MetricRegistry {
reporter.open(reporterConfig);
if (reporter instanceof Scheduled) {
+ executor = Executors.newSingleThreadScheduledExecutor();
LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
- long millis = timeunit.toMillis(period);
- timer = new java.util.Timer("Periodic Metrics Reporter", true);
- timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
- }
- else {
- timer = null;
+ executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
}
}
catch (Throwable t) {
reporter = new JMXReporter();
- timer = null;
+ shutdownExecutor();
LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
}
this.reporter = reporter;
- this.timer = timer;
+ this.executor = executor;
}
}
@@ -142,9 +139,6 @@ public class MetricRegistry {
* Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
*/
public void shutdown() {
- if (timer != null) {
- timer.cancel();
- }
if (reporter != null) {
try {
reporter.close();
@@ -152,6 +146,21 @@ public class MetricRegistry {
LOG.warn("Metrics reporter did not shut down cleanly", t);
}
}
+ shutdownExecutor();
+ }
+
+ private void shutdownExecutor() {
+ if (executor != null) {
+ executor.shutdown();
+
+ try {
+ if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ }
+ }
}
public ScopeFormats getScopeFormats() {
http://git-wip-us.apache.org/repos/asf/flink/blob/56cdec7d/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index ae57f55..087a265 100644
--- a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -53,6 +53,8 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
// public static final String ARG_CONVERSION_RATE = "rateConversion";
// public static final String ARG_CONVERSION_DURATION = "durationConversion";
+ private boolean closed = false;
+
private DatagramSocket socket;
private InetSocketAddress address;
@@ -81,6 +83,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
@Override
public void close() {
+ closed = true;
if (socket != null && !socket.isClosed()) {
socket.close();
}
@@ -95,10 +98,16 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
// operator creation and shutdown
try {
for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
+ if (closed) {
+ return;
+ }
reportGauge(entry.getValue(), entry.getKey());
}
for (Map.Entry<Counter, String> entry : counters.entrySet()) {
+ if (closed) {
+ return;
+ }
reportCounter(entry.getValue(), entry.getKey());
}
}
[2/2] flink git commit: [hotfix] [metrics] Prevent registration
exceptions
Posted by ch...@apache.org.
[hotfix] [metrics] Prevent registration exceptions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19ff8db6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19ff8db6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19ff8db6
Branch: refs/heads/master
Commit: 19ff8db6847be65c60659e5f6e371fb9dacf1160
Parents: 56cdec7
Author: zentol <ch...@apache.org>
Authored: Wed Jun 15 12:17:22 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 13:08:24 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/metrics/MetricRegistry.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/19ff8db6/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index 0abcdec..f3402e9 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -179,8 +179,12 @@ public class MetricRegistry {
* @param group the group that contains the metric
*/
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
- if (reporter != null) {
- reporter.notifyOfAddedMetric(metric, metricName, group);
+ try {
+ if (reporter != null) {
+ reporter.notifyOfAddedMetric(metric, metricName, group);
+ }
+ } catch (Exception e) {
+ LOG.error("Error while registering metric.", e);
}
}
@@ -192,8 +196,12 @@ public class MetricRegistry {
* @param group the group that contains the metric
*/
public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
- if (reporter != null) {
- reporter.notifyOfRemovedMetric(metric, metricName, group);
+ try {
+ if (reporter != null) {
+ reporter.notifyOfRemovedMetric(metric, metricName, group);
+ }
+ } catch (Exception e) {
+ LOG.error("Error while registering metric.", e);
}
}