You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/26 10:17:04 UTC

flink git commit: [FLINK-3950] Implement MeterView

Repository: flink
Updated Branches:
  refs/heads/master 307eae6e2 -> 1db14fc06


[FLINK-3950] Implement MeterView

This closes #2443.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1db14fc0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1db14fc0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1db14fc0

Branch: refs/heads/master
Commit: 1db14fc06198e5519c7e71c418f5277bdc2104fc
Parents: 307eae6
Author: zentol <ch...@apache.org>
Authored: Wed Aug 31 15:47:17 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 26 12:16:49 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/metrics/MeterView.java     | 90 +++++++++++++++++++
 .../java/org/apache/flink/metrics/View.java     | 31 +++++++
 .../org/apache/flink/metrics/MeterViewTest.java | 93 +++++++++++++++++++
 .../flink/runtime/metrics/MetricRegistry.java   | 21 +++--
 .../flink/runtime/metrics/ViewUpdater.java      | 95 ++++++++++++++++++++
 5 files changed, 325 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
new file mode 100644
index 0000000..40dd39f
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * A MeterView provides an average rate of events per second over a given time period.
+ * 
+ * The primary advantage of this class is that the rate is neither updated by the computing thread nor for every event.
+ * Instead, a history of counts is maintained that is updated in regular intervals by a background thread. From this
+ * history a rate is derived on demand, which represents the average rate of events over the given time span. If the
+ * rate is never requested there is thus no overhead for the computation of the rate.
+ * 
+ * Setting the time span to a low value reduces memory-consumption and will more accurately report short-term changes.
+ * The minimum value possible is {@link View#UPDATE_INTERVAL_SECONDS}.
+ * A high value in turn increases memory-consumption since a longer history has to be maintained.
+ * 
+ * The events are counted by a {@link Counter}.
+ */
+public class MeterView implements Meter, View {
+	/** The underlying counter maintaining the count */
+	private final Counter counter;
+	/** The time-span over which the average is calculated */
+	private final int timeSpanInSeconds;
+	/** Circular array containing the history of values */
+	private final long[] values;
+	/** The index in the array for the current time */
+	private int time = 0;
+
+	/** Signals whether a rate was already calculated for the current time-frame */
+	private boolean updateRate = false;
+	/** The last rate we computed */
+	private double currentRate = 0;
+
+	public MeterView(int timeSpanInSeconds) {
+		this(new SimpleCounter(), timeSpanInSeconds);
+	}
+
+	public MeterView(Counter counter, int timeSpanInSeconds) {
+		this.counter = counter;
+		this.timeSpanInSeconds = timeSpanInSeconds - (timeSpanInSeconds % UPDATE_INTERVAL_SECONDS);
+		this.values = new long[this.timeSpanInSeconds / UPDATE_INTERVAL_SECONDS + 1];
+	}
+
+	@Override
+	public void markEvent() {
+		this.counter.inc();
+	}
+
+	@Override
+	public void markEvent(long n) {
+		this.counter.inc(n);
+	}
+
+	@Override
+	public long getCount() {
+		return counter.getCount();
+	}
+
+	@Override
+	public double getRate() {
+		if (updateRate) {
+			final int time = this.time;
+			currentRate =  ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds);
+			updateRate = false;
+		}
+		return currentRate;
+	}
+
+	@Override
+	public void update() {
+		time = (time + 1) % values.length;
+		values[time] = counter.getCount();
+		updateRate = true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
new file mode 100644
index 0000000..1780130
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * An interface for metrics which should be updated in regular intervals by a background thread.
+ */
+public interface View {
+	/** The interval in which metrics are updated */
+	int UPDATE_INTERVAL_SECONDS = 5;
+
+	/**
+	 * This method will be called regularly to update the metric.
+	 */
+	void update();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
new file mode 100644
index 0000000..8ba298f
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MeterViewTest {
+	@Test
+	public void testGetCount() {
+		Counter c = new SimpleCounter();
+		c.inc(5);
+		Meter m = new MeterView(c, 60);
+
+		assertEquals(5, m.getCount());
+	}
+
+	@Test
+	public void testMarkEvent() {
+		Counter c = new SimpleCounter();
+		Meter m = new MeterView(c, 60);
+
+		assertEquals(0, m.getCount());
+		m.markEvent();
+		assertEquals(1, m.getCount());
+		m.markEvent(2);
+		assertEquals(3, m.getCount());
+	}
+
+	@Test
+	public void testGetRate() {
+		Counter c = new SimpleCounter();
+		MeterView m = new MeterView(c, 60);
+
+		// values = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
+		for (int x = 0; x < 12; x++) {
+			m.markEvent(10);
+			m.update();
+		}
+		// values = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]
+		assertEquals(2.0, m.getRate(), 0.1); // 120 - 0 / 60
+
+		for (int x = 0; x < 12; x++) {
+			m.markEvent(10);
+			m.update();
+		}
+		// values = [130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240, 120]
+		assertEquals(2.0, m.getRate(), 0.1); // 240 - 120 / 60
+
+		for (int x = 0; x < 6; x++) {
+			m.markEvent(20);
+			m.update();
+		}
+		// values = [280, 300, 320, 340, 360, 180, 190, 200, 210, 220, 230, 240, 260]
+		assertEquals(3.0, m.getRate(), 0.1); // 360 - 180 / 60
+
+		for (int x = 0; x < 6; x++) {
+			m.markEvent(20);
+			m.update();
+		}
+		// values = [280, 300, 320, 340, 360, 380, 400, 420, 440, 460, 480, 240, 260]
+		assertEquals(4.0, m.getRate(), 0.1); // 480 - 240 / 60
+
+		for (int x = 0; x < 6; x++) {
+			m.update();
+		}
+		// values = [480, 480, 480, 480, 360, 380, 400, 420, 440, 460, 480, 480, 480]
+		assertEquals(2.0, m.getRate(), 0.1); // 480 - 360 / 60
+
+		for (int x = 0; x < 6; x++) {
+			m.update();
+		}
+		// values = [480, 480, 480, 480, 480, 480, 480, 480, 480, 480, 480, 480, 480]
+		assertEquals(0.0, m.getRate(), 0.1); // 480 - 480 / 60
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 219927d..b514fd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -54,6 +55,8 @@ public class MetricRegistry {
 	private ScheduledExecutorService executor;
 	private ActorRef queryService;
 
+	private ViewUpdater viewUpdater;
+
 	private final ScopeFormats scopeFormats;
 	private final char globalDelimiter;
 	private final List<Character> delimiters = new ArrayList<>();
@@ -70,11 +73,12 @@ public class MetricRegistry {
 
 		List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations();
 
+		this.executor = Executors.newSingleThreadScheduledExecutor();
+
 		if (reporterConfigurations.isEmpty()) {
 			// no reporters defined
 			// by default, don't report anything
 			LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
-			this.executor = null;
 		} else {
 			// we have some reporters so
 			for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
@@ -113,9 +117,6 @@ public class MetricRegistry {
 					reporterInstance.open(metricConfig);
 
 					if (reporterInstance instanceof Scheduled) {
-						if (executor == null) {
-							executor = Executors.newSingleThreadScheduledExecutor();
-						}
 						LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
 
 						executor.scheduleWithFixedDelay(
@@ -133,7 +134,6 @@ public class MetricRegistry {
 					this.delimiters.add(delimiterForReporter.charAt(0));
 				}
 				catch (Throwable t) {
-					shutdownExecutor();
 					LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
 				}
 			}
@@ -242,6 +242,12 @@ public class MetricRegistry {
 			if (queryService != null) {
 				MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
 			}
+			if (metric instanceof View) {
+				if (viewUpdater == null) {
+					viewUpdater = new ViewUpdater(executor);
+				}
+				viewUpdater.notifyOfAddedView((View) metric);
+			}
 		} catch (Exception e) {
 			LOG.error("Error while registering metric.", e);
 		}
@@ -268,6 +274,11 @@ public class MetricRegistry {
 			if (queryService != null) {
 				MetricQueryService.notifyOfRemovedMetric(queryService, metric);
 			}
+			if (metric instanceof View) {
+				if (viewUpdater != null) {
+					viewUpdater.notifyOfRemovedView((View) metric);
+				}
+			}
 		} catch (Exception e) {
 			LOG.error("Error while registering metric.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
new file mode 100644
index 0000000..e4d0596
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.metrics.View;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.metrics.View.UPDATE_INTERVAL_SECONDS;
+
+/**
+ * The ViewUpdater is responsible for updating all metrics that implement the {@link View} interface.
+ */
+public class ViewUpdater {
+	private final Set<View> toAdd = new HashSet<>();
+	private final Set<View> toRemove = new HashSet<>();
+
+	private final Object lock = new Object();
+
+	public ViewUpdater(ScheduledExecutorService executor) {
+		executor.scheduleWithFixedDelay(new ViewUpdaterTask(lock, toAdd, toRemove), 5, UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
+	}
+
+	/**
+	 * Notifies this ViewUpdater of a new metric that should be regularly updated.
+	 *
+	 * @param view metric that should be regularly updated
+	 */
+	public void notifyOfAddedView(View view) {
+		synchronized (lock) {
+			toAdd.add(view);
+		}
+	}
+
+	/**
+	 * Notifies this ViewUpdater of a metric that should no longer be regularly updated.
+	 *
+	 * @param view metric that should no longer be regularly updated
+	 */
+	public void notifyOfRemovedView(View view) {
+		synchronized (lock) {
+			toRemove.add(view);
+		}
+	}
+
+	/**
+	 * The TimerTask doing the actual updating.
+	 */
+	private static class ViewUpdaterTask extends TimerTask {
+		private final Object lock;
+		private final Set<View> views;
+		private final Set<View> toAdd;
+		private final Set<View> toRemove;
+
+		private ViewUpdaterTask(Object lock, Set<View> toAdd, Set<View> toRemove) {
+			this.lock = lock;
+			this.views = new HashSet<>();
+			this.toAdd = toAdd;
+			this.toRemove = toRemove;
+		}
+
+		@Override
+		public void run() {
+			for (View toUpdate : this.views) {
+				toUpdate.update();
+			}
+
+			synchronized (lock) {
+				views.addAll(toAdd);
+				toAdd.clear();
+				views.removeAll(toRemove);
+				toRemove.clear();
+			}
+		}
+	}
+}