You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2016/06/15 11:51:56 UTC

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/2105

    [FLINK-4074] Make metric reporters less blocking

    - adds a cancel flag to the StatsDReporter, which is checked before sending every metric. set to true when stop() is called.
    - switched from Timer to ScheduledExecutorService, in the hopes of better forced termination.
    - also contains a hotfix preventing exceptions occurring in (un)register to be propagated

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink metrics_blocking_reporter

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2105
    
----
commit 47ae329fff1973e3de9c00f0352a984bb1dba17d
Author: zentol <ch...@apache.org>
Date:   2016-06-15T10:23:41Z

    [FLINK-4074] Make metric reporters less blocking

commit d6e9ce495ff420a3d454e1e35fe72507db03ae27
Author: zentol <ch...@apache.org>
Date:   2016-06-15T10:17:22Z

    [hotfix] [metrics] Prevent registration exceptions

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2105


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68238993
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
     
     				if (reporter instanceof Scheduled) {
     					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
    -					long millis = timeunit.toMillis(period);
     					
    -					timer = new java.util.Timer("Periodic Metrics Reporter", true);
    -					timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
    +					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
     				}
     				else {
    -					timer = null;
    +					executor = null;
     				}
     			}
     			catch (Throwable t) {
     				reporter = new JMXReporter();
    -				timer = null;
    +				executor = null;
     				LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
     			}
     
     			this.reporter = reporter;
    -			this.timer = timer;
    +			this.executor = executor;
     		}
     	}
     
     	/**
     	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
     	 */
     	public void shutdown() {
    -		if (timer != null) {
    -			timer.cancel();
    -		}
     		if (reporter != null) {
     			try {
     				reporter.close();
     			} catch (Throwable t) {
     				LOG.warn("Metrics reporter did not shut down cleanly", t);
     			}
     		}
    +		if (executor != null) {
    +			executor.shutdownNow();
    --- End diff --
    
    Alright. But not all reporters use the same logic, since we have `Scheduled` and non-scheduled reporters, haven't we?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68200885
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
     
     				if (reporter instanceof Scheduled) {
     					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
    -					long millis = timeunit.toMillis(period);
     					
    -					timer = new java.util.Timer("Periodic Metrics Reporter", true);
    -					timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
    +					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
     				}
     				else {
    -					timer = null;
    +					executor = null;
     				}
     			}
     			catch (Throwable t) {
     				reporter = new JMXReporter();
    -				timer = null;
    +				executor = null;
     				LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
     			}
     
     			this.reporter = reporter;
    -			this.timer = timer;
    +			this.executor = executor;
     		}
     	}
     
     	/**
     	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
     	 */
     	public void shutdown() {
    -		if (timer != null) {
    -			timer.cancel();
    -		}
     		if (reporter != null) {
     			try {
     				reporter.close();
     			} catch (Throwable t) {
     				LOG.warn("Metrics reporter did not shut down cleanly", t);
     			}
     		}
    +		if (executor != null) {
    +			executor.shutdownNow();
    --- End diff --
    
    I guess that one second of delay wouldn't really hurt. That's at least how the `ScheduledReporter` do it.
    
    Btw: Why do we actually replicate the execution logic for the `com.codahale.metrics.ScheduledReporter`. As far as I can tell, we could simply use the internal reporting mechanism. Then we would not have to implement the `Scheduled` interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68036583
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -86,11 +88,11 @@ public MetricRegistry(Configuration config) {
     			// by default, create JMX metrics
     			LOG.info("No metrics reporter configured, exposing metrics via JMX");
     			this.reporter = new JMXReporter();
    -			this.timer = null;
    +			this.executor = null;
     		}
     		else {
     			MetricReporter reporter;
    -			java.util.Timer timer;
    +			ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    --- End diff --
    
    good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68036496
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -170,8 +170,12 @@ public ScopeFormats getScopeFormats() {
     	 * @param group       the group that contains the metric
     	 */
     	public void register(Metric metric, String metricName, AbstractMetricGroup group) {
    -		if (reporter != null) {
    -			reporter.notifyOfAddedMetric(metric, metricName, group);
    +		try {
    +			if (reporter != null) {
    +				reporter.notifyOfAddedMetric(metric, metricName, group);
    +			}
    +		} catch (Exception e) {
    +			LOG.error("Error while registering metric.", e);
    --- End diff --
    
    this is simply a fail-safe to ensure that no exceptions are thrown. Generally a reporter shouldn't throw an Exception in these methods, but I'd rather be safe then sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68037781
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
     
     				if (reporter instanceof Scheduled) {
     					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
    -					long millis = timeunit.toMillis(period);
     					
    -					timer = new java.util.Timer("Periodic Metrics Reporter", true);
    -					timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
    +					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
     				}
     				else {
    -					timer = null;
    +					executor = null;
     				}
     			}
     			catch (Throwable t) {
     				reporter = new JMXReporter();
    -				timer = null;
    +				executor = null;
     				LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
     			}
     
     			this.reporter = reporter;
    -			this.timer = timer;
    +			this.executor = executor;
     		}
     	}
     
     	/**
     	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
     	 */
     	public void shutdown() {
    -		if (timer != null) {
    -			timer.cancel();
    -		}
     		if (reporter != null) {
     			try {
     				reporter.close();
     			} catch (Throwable t) {
     				LOG.warn("Metrics reporter did not shut down cleanly", t);
     			}
     		}
    +		if (executor != null) {
    +			executor.shutdownNow();
    --- End diff --
    
    I've thought about that but i figured that would delay the shutdown, which is the primary thing we wanted to solve.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68247987
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
     
     				if (reporter instanceof Scheduled) {
     					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
    -					long millis = timeunit.toMillis(period);
     					
    -					timer = new java.util.Timer("Periodic Metrics Reporter", true);
    -					timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
    +					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
     				}
     				else {
    -					timer = null;
    +					executor = null;
     				}
     			}
     			catch (Throwable t) {
     				reporter = new JMXReporter();
    -				timer = null;
    +				executor = null;
     				LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
     			}
     
     			this.reporter = reporter;
    -			this.timer = timer;
    +			this.executor = executor;
     		}
     	}
     
     	/**
     	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
     	 */
     	public void shutdown() {
    -		if (timer != null) {
    -			timer.cancel();
    -		}
     		if (reporter != null) {
     			try {
     				reporter.close();
     			} catch (Throwable t) {
     				LOG.warn("Metrics reporter did not shut down cleanly", t);
     			}
     		}
    +		if (executor != null) {
    +			executor.shutdownNow();
    --- End diff --
    
    *correction* all reporters _that regularly send out reports_ follow the same logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2105: [FLINK-4074] Make metric reporters less blocking

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2105
  
    Good improvements to make the Reporters less blocking @zentol. I had only some minor comments. After addressing them, I think the PR is good to be merged :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68036163
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
     
     				if (reporter instanceof Scheduled) {
     					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
    -					long millis = timeunit.toMillis(period);
     					
    -					timer = new java.util.Timer("Periodic Metrics Reporter", true);
    -					timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
    +					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
     				}
     				else {
    -					timer = null;
    +					executor = null;
     				}
     			}
     			catch (Throwable t) {
     				reporter = new JMXReporter();
    -				timer = null;
    +				executor = null;
    --- End diff --
    
    Same here. If `executor != null`, then we should shut it down.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68037092
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
     
     				if (reporter instanceof Scheduled) {
     					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
    -					long millis = timeunit.toMillis(period);
     					
    -					timer = new java.util.Timer("Periodic Metrics Reporter", true);
    -					timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
    +					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
     				}
     				else {
    -					timer = null;
    +					executor = null;
     				}
     			}
     			catch (Throwable t) {
     				reporter = new JMXReporter();
    -				timer = null;
    +				executor = null;
     				LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
     			}
     
     			this.reporter = reporter;
    -			this.timer = timer;
    +			this.executor = executor;
     		}
     	}
     
     	/**
     	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
     	 */
     	public void shutdown() {
    -		if (timer != null) {
    -			timer.cancel();
    -		}
     		if (reporter != null) {
     			try {
     				reporter.close();
     			} catch (Throwable t) {
     				LOG.warn("Metrics reporter did not shut down cleanly", t);
     			}
     		}
    +		if (executor != null) {
    +			executor.shutdownNow();
    --- End diff --
    
    Would it make sense to first try to stop the executor via `shutdown`, then call `awaitTermination` for some time and if that fails, then try to shut it down via `shutdownNow`. That would give the executor service the possibility to properly shutdown?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68036072
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -86,11 +88,11 @@ public MetricRegistry(Configuration config) {
     			// by default, create JMX metrics
     			LOG.info("No metrics reporter configured, exposing metrics via JMX");
     			this.reporter = new JMXReporter();
    -			this.timer = null;
    +			this.executor = null;
     		}
     		else {
     			MetricReporter reporter;
    -			java.util.Timer timer;
    +			ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    --- End diff --
    
    Why not instantiating the `ScheduledExecutorService` when we know that the `MetricReporter` is of type `Scheduled`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2105: [FLINK-4074] Make metric reporters less blocking

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2105
  
    merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68036113
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
     
     				if (reporter instanceof Scheduled) {
     					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
    -					long millis = timeunit.toMillis(period);
     					
    -					timer = new java.util.Timer("Periodic Metrics Reporter", true);
    -					timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
    +					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
     				}
     				else {
    -					timer = null;
    +					executor = null;
    --- End diff --
    
    `executor` should be shutdown here, I would assume.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68035265
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -170,8 +170,12 @@ public ScopeFormats getScopeFormats() {
     	 * @param group       the group that contains the metric
     	 */
     	public void register(Metric metric, String metricName, AbstractMetricGroup group) {
    -		if (reporter != null) {
    -			reporter.notifyOfAddedMetric(metric, metricName, group);
    +		try {
    +			if (reporter != null) {
    +				reporter.notifyOfAddedMetric(metric, metricName, group);
    +			}
    +		} catch (Exception e) {
    +			LOG.error("Error while registering metric.", e);
    --- End diff --
    
    Why do we add the try-catch here? `notifyOfAddedMetric` does not declare thrown exceptions. Thus we try to catch unchecked exceptions here? Which unchecked exceptions did you encounter here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2105: [FLINK-4074] Make metric reporters less blocking

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2105
  
    @tillrohrmann Addressed all your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68035356
  
    --- Diff: flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ---
    @@ -53,6 +53,8 @@
     //	public static final String ARG_CONVERSION_RATE = "rateConversion";
     //	public static final String ARG_CONVERSION_DURATION = "durationConversion";
     
    +	private boolean cancel = false;
    --- End diff --
    
    Maybe we could rename the variable to `closed` or `running`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2105: [FLINK-4074] Make metric reporters less blocking

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2105
  
    Very good work @zentol.
    
    +1 for merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2105#discussion_r68201901
  
    --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
    @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
     
     				if (reporter instanceof Scheduled) {
     					LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name());
    -					long millis = timeunit.toMillis(period);
     					
    -					timer = new java.util.Timer("Periodic Metrics Reporter", true);
    -					timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis);
    +					executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit);
     				}
     				else {
    -					timer = null;
    +					executor = null;
     				}
     			}
     			catch (Throwable t) {
     				reporter = new JMXReporter();
    -				timer = null;
    +				executor = null;
     				LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t);
     			}
     
     			this.reporter = reporter;
    -			this.timer = timer;
    +			this.executor = executor;
     		}
     	}
     
     	/**
     	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
     	 */
     	public void shutdown() {
    -		if (timer != null) {
    -			timer.cancel();
    -		}
     		if (reporter != null) {
     			try {
     				reporter.close();
     			} catch (Throwable t) {
     				LOG.warn("Metrics reporter did not shut down cleanly", t);
     			}
     		}
    +		if (executor != null) {
    +			executor.shutdownNow();
    --- End diff --
    
    so that all reporters use the same logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---