You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/27 11:08:41 UTC

[1/2] flink git commit: [FLINK-4074] Make metric reporters less blocking

Repository: flink
Updated Branches:
  refs/heads/master 9487fcbfb -> 19ff8db68


[FLINK-4074] Make metric reporters less blocking

This closes #2105


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56cdec7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56cdec7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56cdec7d

Branch: refs/heads/master
Commit: 56cdec7d81a53f9ea9578ca274c149f9758b5ffd
Parents: 9487fcb
Author: zentol <ch...@apache.org>
Authored: Wed Jun 15 12:23:41 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 13:08:16 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/metrics/MetricRegistry.java    | 39 ++++++++++++--------
 .../flink/metrics/statsd/StatsDReporter.java    |  9 +++++
 2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56cdec7d/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index b3422e1..0abcdec 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -31,6 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -60,7 +62,7 @@ public class MetricRegistry {
 	static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
 	
 	private final MetricReporter reporter;
-	private final java.util.Timer timer;
+	private final ScheduledExecutorService executor;
 
 	private final ScopeFormats scopeFormats;
 
@@ -86,12 +88,11 @@ public class MetricRegistry {
 			// by default, create JMX metrics
 			LOG.info("No metrics reporter configured, exposing metrics via JMX");
 			this.reporter = new JMXReporter();
-			this.timer = null;
+			this.executor = null;
 		}
 		else {
 			MetricReporter reporter;
-			java.util.Timer timer;
-			
+			ScheduledExecutorService executor = null;
 			try {
 				String configuredPeriod = config.getString(KEY_METRICS_REPORTER_INTERVAL, null);
 				TimeUnit timeunit = TimeUnit.SECONDS;
@@ -117,24 +118,20 @@ public class MetricRegistry {
 				reporter.open(reporterConfig);
 
 				if (reporter instanceof Scheduled) {
+					executor = Executors.newSingleThreadScheduledExecutor();
 					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
-					long millis = timeunit.toMillis(period);
 					
-					timer = new java.util.Timer("Periodic Metrics Reporter", true);
-					timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
-				}
-				else {
-					timer = null;
+					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
 				}
 			}
 			catch (Throwable t) {
 				reporter = new JMXReporter();
-				timer = null;
+				shutdownExecutor();
 				LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
 			}
 
 			this.reporter = reporter;
-			this.timer = timer;
+			this.executor = executor;
 		}
 	}
 
@@ -142,9 +139,6 @@ public class MetricRegistry {
 	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
 	 */
 	public void shutdown() {
-		if (timer != null) {
-			timer.cancel();
-		}
 		if (reporter != null) {
 			try {
 				reporter.close();
@@ -152,6 +146,21 @@ public class MetricRegistry {
 				LOG.warn("Metrics reporter did not shut down cleanly", t);
 			}
 		}
+		shutdownExecutor();
+	}
+	
+	private void shutdownExecutor() {
+		if (executor != null) {
+			executor.shutdown();
+
+			try {
+				if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+					executor.shutdownNow();
+				}
+			} catch (InterruptedException e) {
+				executor.shutdownNow();
+			}
+		}
 	}
 
 	public ScopeFormats getScopeFormats() {

http://git-wip-us.apache.org/repos/asf/flink/blob/56cdec7d/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index ae57f55..087a265 100644
--- a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -53,6 +53,8 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 //	public static final String ARG_CONVERSION_RATE = "rateConversion";
 //	public static final String ARG_CONVERSION_DURATION = "durationConversion";
 
+	private boolean closed = false;
+
 	private DatagramSocket socket;
 	private InetSocketAddress address;
 
@@ -81,6 +83,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 
 	@Override
 	public void close() {
+		closed = true;
 		if (socket != null && !socket.isClosed()) {
 			socket.close();
 		}
@@ -95,10 +98,16 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 		// operator creation and shutdown
 		try {
 			for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) {
+				if (closed) {
+					return;
+				}
 				reportGauge(entry.getValue(), entry.getKey());
 			}
 
 			for (Map.Entry<Counter, String> entry : counters.entrySet()) {
+				if (closed) {
+					return;
+				}
 				reportCounter(entry.getValue(), entry.getKey());
 			}
 		}


[2/2] flink git commit: [hotfix] [metrics] Prevent registration exceptions

Posted by ch...@apache.org.
[hotfix] [metrics] Prevent registration exceptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19ff8db6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19ff8db6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19ff8db6

Branch: refs/heads/master
Commit: 19ff8db6847be65c60659e5f6e371fb9dacf1160
Parents: 56cdec7
Author: zentol <ch...@apache.org>
Authored: Wed Jun 15 12:17:22 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 13:08:24 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/metrics/MetricRegistry.java    | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19ff8db6/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index 0abcdec..f3402e9 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -179,8 +179,12 @@ public class MetricRegistry {
 	 * @param group       the group that contains the metric
 	 */
 	public void register(Metric metric, String metricName, AbstractMetricGroup group) {
-		if (reporter != null) {
-			reporter.notifyOfAddedMetric(metric, metricName, group);
+		try {
+			if (reporter != null) {
+				reporter.notifyOfAddedMetric(metric, metricName, group);
+			}
+		} catch (Exception e) {
+			LOG.error("Error while registering metric.", e);
 		}
 	}
 
@@ -192,8 +196,12 @@ public class MetricRegistry {
 	 * @param group       the group that contains the metric
 	 */
 	public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
-		if (reporter != null) {
-			reporter.notifyOfRemovedMetric(metric, metricName, group);
+		try {
+			if (reporter != null) {
+				reporter.notifyOfRemovedMetric(metric, metricName, group);
+			}
+		} catch (Exception e) {
+			LOG.error("Error while registering metric.", e);
 		}
 	}