You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/26 10:17:04 UTC
flink git commit: [FLINK-3950] Implement MeterView
Repository: flink
Updated Branches:
refs/heads/master 307eae6e2 -> 1db14fc06
[FLINK-3950] Implement MeterView
This closes #2443.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1db14fc0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1db14fc0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1db14fc0
Branch: refs/heads/master
Commit: 1db14fc06198e5519c7e71c418f5277bdc2104fc
Parents: 307eae6
Author: zentol <ch...@apache.org>
Authored: Wed Aug 31 15:47:17 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 26 12:16:49 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/metrics/MeterView.java | 90 +++++++++++++++++++
.../java/org/apache/flink/metrics/View.java | 31 +++++++
.../org/apache/flink/metrics/MeterViewTest.java | 93 +++++++++++++++++++
.../flink/runtime/metrics/MetricRegistry.java | 21 +++--
.../flink/runtime/metrics/ViewUpdater.java | 95 ++++++++++++++++++++
5 files changed, 325 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
new file mode 100644
index 0000000..40dd39f
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * A MeterView provides an average rate of events per second over a given time period.
+ *
+ * The primary advantage of this class is that the rate is neither updated by the computing thread nor for every event.
+ * Instead, a history of counts is maintained that is updated in regular intervals by a background thread. From this
+ * history a rate is derived on demand, which represents the average rate of events over the given time span. If the
+ * rate is never requested there is thus no overhead for the computation of the rate.
+ *
+ * Setting the time span to a low value reduces memory-consumption and will more accurately report short-term changes.
+ * The minimum value possible is {@link View#UPDATE_INTERVAL_SECONDS}.
+ * A high value in turn increases memory-consumption since a longer history has to be maintained.
+ *
+ * The events are counted by a {@link Counter}.
+ */
+public class MeterView implements Meter, View {
+ /** The underlying counter maintaining the count */
+ private final Counter counter;
+ /** The time-span over which the average is calculated */
+ private final int timeSpanInSeconds;
+ /** Circular array containing the history of values */
+ private final long[] values;
+ /** The index in the array for the current time */
+ private int time = 0;
+
+ /** Signals whether a rate was already calculated for the current time-frame */
+ private boolean updateRate = false;
+ /** The last rate we computed */
+ private double currentRate = 0;
+
+ public MeterView(int timeSpanInSeconds) {
+ this(new SimpleCounter(), timeSpanInSeconds);
+ }
+
+ public MeterView(Counter counter, int timeSpanInSeconds) {
+ this.counter = counter;
+ this.timeSpanInSeconds = timeSpanInSeconds - (timeSpanInSeconds % UPDATE_INTERVAL_SECONDS);
+ this.values = new long[this.timeSpanInSeconds / UPDATE_INTERVAL_SECONDS + 1];
+ }
+
+ @Override
+ public void markEvent() {
+ this.counter.inc();
+ }
+
+ @Override
+ public void markEvent(long n) {
+ this.counter.inc(n);
+ }
+
+ @Override
+ public long getCount() {
+ return counter.getCount();
+ }
+
+ @Override
+ public double getRate() {
+ if (updateRate) {
+ final int time = this.time;
+ currentRate = ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds);
+ updateRate = false;
+ }
+ return currentRate;
+ }
+
+ @Override
+ public void update() {
+ time = (time + 1) % values.length;
+ values[time] = counter.getCount();
+ updateRate = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
new file mode 100644
index 0000000..1780130
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * An interface for metrics which should be updated in regular intervals by a background thread.
+ */
+public interface View {
+ /** The interval in which metrics are updated */
+ int UPDATE_INTERVAL_SECONDS = 5;
+
+ /**
+ * This method will be called regularly to update the metric.
+ */
+ void update();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
new file mode 100644
index 0000000..8ba298f
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MeterViewTest {
+ @Test
+ public void testGetCount() {
+ Counter c = new SimpleCounter();
+ c.inc(5);
+ Meter m = new MeterView(c, 60);
+
+ assertEquals(5, m.getCount());
+ }
+
+ @Test
+ public void testMarkEvent() {
+ Counter c = new SimpleCounter();
+ Meter m = new MeterView(c, 60);
+
+ assertEquals(0, m.getCount());
+ m.markEvent();
+ assertEquals(1, m.getCount());
+ m.markEvent(2);
+ assertEquals(3, m.getCount());
+ }
+
+ @Test
+ public void testGetRate() {
+ Counter c = new SimpleCounter();
+ MeterView m = new MeterView(c, 60);
+
+ // values = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
+ for (int x = 0; x < 12; x++) {
+ m.markEvent(10);
+ m.update();
+ }
+ // values = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]
+ assertEquals(2.0, m.getRate(), 0.1); // 120 - 0 / 60
+
+ for (int x = 0; x < 12; x++) {
+ m.markEvent(10);
+ m.update();
+ }
+ // values = [130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240, 120]
+ assertEquals(2.0, m.getRate(), 0.1); // 240 - 120 / 60
+
+ for (int x = 0; x < 6; x++) {
+ m.markEvent(20);
+ m.update();
+ }
+ // values = [280, 300, 320, 340, 360, 180, 190, 200, 210, 220, 230, 240, 260]
+ assertEquals(3.0, m.getRate(), 0.1); // 360 - 180 / 60
+
+ for (int x = 0; x < 6; x++) {
+ m.markEvent(20);
+ m.update();
+ }
+ // values = [280, 300, 320, 340, 360, 380, 400, 420, 440, 460, 480, 240, 260]
+ assertEquals(4.0, m.getRate(), 0.1); // 480 - 240 / 60
+
+ for (int x = 0; x < 6; x++) {
+ m.update();
+ }
+ // values = [480, 480, 480, 480, 360, 380, 400, 420, 440, 460, 480, 480, 480]
+ assertEquals(2.0, m.getRate(), 0.1); // 480 - 360 / 60
+
+ for (int x = 0; x < 6; x++) {
+ m.update();
+ }
+ // values = [480, 480, 480, 480, 480, 480, 480, 480, 480, 480, 480, 480, 480]
+ assertEquals(0.0, m.getRate(), 0.1); // 480 - 480 / 60
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 219927d..b514fd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -54,6 +55,8 @@ public class MetricRegistry {
private ScheduledExecutorService executor;
private ActorRef queryService;
+ private ViewUpdater viewUpdater;
+
private final ScopeFormats scopeFormats;
private final char globalDelimiter;
private final List<Character> delimiters = new ArrayList<>();
@@ -70,11 +73,12 @@ public class MetricRegistry {
List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations();
+ this.executor = Executors.newSingleThreadScheduledExecutor();
+
if (reporterConfigurations.isEmpty()) {
// no reporters defined
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
- this.executor = null;
} else {
// we have some reporters so
for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
@@ -113,9 +117,6 @@ public class MetricRegistry {
reporterInstance.open(metricConfig);
if (reporterInstance instanceof Scheduled) {
- if (executor == null) {
- executor = Executors.newSingleThreadScheduledExecutor();
- }
LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
executor.scheduleWithFixedDelay(
@@ -133,7 +134,6 @@ public class MetricRegistry {
this.delimiters.add(delimiterForReporter.charAt(0));
}
catch (Throwable t) {
- shutdownExecutor();
LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
}
}
@@ -242,6 +242,12 @@ public class MetricRegistry {
if (queryService != null) {
MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
}
+ if (metric instanceof View) {
+ if (viewUpdater == null) {
+ viewUpdater = new ViewUpdater(executor);
+ }
+ viewUpdater.notifyOfAddedView((View) metric);
+ }
} catch (Exception e) {
LOG.error("Error while registering metric.", e);
}
@@ -268,6 +274,11 @@ public class MetricRegistry {
if (queryService != null) {
MetricQueryService.notifyOfRemovedMetric(queryService, metric);
}
+ if (metric instanceof View) {
+ if (viewUpdater != null) {
+ viewUpdater.notifyOfRemovedView((View) metric);
+ }
+ }
} catch (Exception e) {
LOG.error("Error while registering metric.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1db14fc0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
new file mode 100644
index 0000000..e4d0596
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.metrics.View;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.metrics.View.UPDATE_INTERVAL_SECONDS;
+
+/**
+ * The ViewUpdater is responsible for updating all metrics that implement the {@link View} interface.
+ */
+public class ViewUpdater {
+ private final Set<View> toAdd = new HashSet<>();
+ private final Set<View> toRemove = new HashSet<>();
+
+ private final Object lock = new Object();
+
+ public ViewUpdater(ScheduledExecutorService executor) {
+ executor.scheduleWithFixedDelay(new ViewUpdaterTask(lock, toAdd, toRemove), 5, UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Notifies this ViewUpdater of a new metric that should be regularly updated.
+ *
+ * @param view metric that should be regularly updated
+ */
+ public void notifyOfAddedView(View view) {
+ synchronized (lock) {
+ toAdd.add(view);
+ }
+ }
+
+ /**
+ * Notifies this ViewUpdater of a metric that should no longer be regularly updated.
+ *
+ * @param view metric that should no longer be regularly updated
+ */
+ public void notifyOfRemovedView(View view) {
+ synchronized (lock) {
+ toRemove.add(view);
+ }
+ }
+
+ /**
+ * The TimerTask doing the actual updating.
+ */
+ private static class ViewUpdaterTask extends TimerTask {
+ private final Object lock;
+ private final Set<View> views;
+ private final Set<View> toAdd;
+ private final Set<View> toRemove;
+
+ private ViewUpdaterTask(Object lock, Set<View> toAdd, Set<View> toRemove) {
+ this.lock = lock;
+ this.views = new HashSet<>();
+ this.toAdd = toAdd;
+ this.toRemove = toRemove;
+ }
+
+ @Override
+ public void run() {
+ for (View toUpdate : this.views) {
+ toUpdate.update();
+ }
+
+ synchronized (lock) {
+ views.addAll(toAdd);
+ toAdd.clear();
+ views.removeAll(toRemove);
+ toRemove.clear();
+ }
+ }
+ }
+}