You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/03/27 11:39:26 UTC
[9/9] flink git commit: [FLINK-1501] Add metrics library for
monitoring TaskManagers
[FLINK-1501] Add metrics library for monitoring TaskManagers
This closes #421
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d1f8b07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d1f8b07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d1f8b07
Branch: refs/heads/master
Commit: 2d1f8b07cc45359e290160cbc076a86229bcd158
Parents: 6b9cee3
Author: Robert Metzger <rm...@apache.org>
Authored: Sat Feb 7 11:33:31 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Mar 27 11:38:10 2015 +0100
----------------------------------------------------------------------
LICENSE | 1 +
.../flink/configuration/ConfigConstants.java | 3 +
flink-runtime/pom.xml | 21 +
.../apache/flink/runtime/instance/Instance.java | 13 +
.../flink/runtime/instance/InstanceManager.java | 52 +-
.../jobmanager/web/SetupInfoServlet.java | 4 +
.../src/main/resources/log4j.properties | 24 +
.../resources/web-docs-infoserver/analyze.html | 2 -
.../web-docs-infoserver/blank-page.html | 1 -
.../web-docs-infoserver/configuration.html | 1 -
.../web-docs-infoserver/css/bootstrap.css.map | 1 +
.../web-docs-infoserver/css/rickshaw.min.css | 1 +
.../resources/web-docs-infoserver/history.html | 1 -
.../resources/web-docs-infoserver/index.html | 1 -
.../web-docs-infoserver/js/bootstrap.min.js | 6 -
.../web-docs-infoserver/js/d3.layout.min.js | 1 +
.../resources/web-docs-infoserver/js/d3.min.js | 2 +
.../web-docs-infoserver/js/flot/jquery.flot.js | 2599 ------------------
.../js/flot/jquery.flot.resize.js | 60 -
.../js/flot/jquery.flot.tooltip.min.js | 12 -
.../js/jquery.flot.categories.min.js | 44 -
.../web-docs-infoserver/js/jquery.flot.min.js | 29 -
.../web-docs-infoserver/js/jquery.flot.stack.js | 188 --
.../js/jquery.flot.time.min.js | 9 -
.../resources/web-docs-infoserver/js/jquery.js | 154 --
.../web-docs-infoserver/js/rickshaw.min.js | 3 +
.../web-docs-infoserver/js/taskmanager.js | 340 ++-
.../web-docs-infoserver/taskmanagers.html | 71 +-
.../flink/runtime/jobmanager/JobManager.scala | 4 +-
.../runtime/messages/TaskManagerMessages.scala | 3 +-
.../minicluster/LocalFlinkMiniCluster.scala | 46 +-
.../flink/runtime/taskmanager/TaskManager.scala | 28 +-
.../runtime/instance/InstanceManagerTest.java | 12 +-
.../flink/test/util/AbstractTestBase.java | 2 +-
.../test/util/MultipleProgramsTestBase.java | 4 +-
.../apache/flink/test/util/TestBaseUtils.java | 50 +-
.../test/util/ForkableFlinkMiniCluster.scala | 12 +-
.../flink/test/web/WebFrontendITCase.java | 114 +
.../src/test/resources/log4j-test.properties | 2 +-
flink-yarn-tests/pom.xml | 5 +
.../flink/yarn/YARNSessionFIFOITCase.java | 36 +-
pom.xml | 5 +
42 files changed, 733 insertions(+), 3234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 85d0d85..dc81413 100644
--- a/LICENSE
+++ b/LICENSE
@@ -228,6 +228,7 @@ The Apache Flink project bundles the following files under the MIT License:
- normalize.css v3.0.0 (http://git.io/normalize) - Copyright (c) Nicolas Gallagher and Jonathan Neal
- Font Awesome - Code (http://fortawesome.github.io/Font-Awesome/) - Copyright (c) 2014 Dave Gandy
- D3 dagre renderer (https://github.com/cpettitt/dagre-d3) - Copyright (c) 2012-2013 Chris Pettitt
+ - Rickshaw (https://github.com/shutterstock/rickshaw) - Copyright (C) 2011-2013 by Shutterstock Images, LLC
All rights reserved.
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 44f146d..ace2ae5 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -615,6 +615,9 @@ public final class ConfigConstants {
* Sets the number of local task managers
*/
public static final String LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER = "localinstancemanager.numtaskmanager";
+
+
+ public static final String LOCAL_INSTANCE_MANAGER_START_WEBSERVER = "localinstancemanager.start-webserver";
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 4654c60..05f22a5 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -34,6 +34,10 @@ under the License.
<packaging>jar</packaging>
+ <properties>
+ <metrics.version>3.1.0</metrics.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -146,6 +150,23 @@ under the License.
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-json</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index d921ff4..e27b7ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -66,10 +66,13 @@ public class Instance {
/** Time when last heat beat has been received from the task manager running on this taskManager. */
private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
+
+ private byte[] lastMetricsReport;
/** Flag marking the instance as alive or as dead. */
private volatile boolean isDead;
+
// --------------------------------------------------------------------------------------------
/**
@@ -170,6 +173,14 @@ public class Instance {
this.lastReceivedHeartBeat = System.currentTimeMillis();
}
+ public void setMetricsReport(byte[] lastMetricsReport) {
+ this.lastMetricsReport = lastMetricsReport;
+ }
+
+ public byte[] getLastMetricsReport() {
+ return lastMetricsReport;
+ }
+
/**
* Checks whether the last heartbeat occurred within the last {@code n} milliseconds
* before the given timestamp {@code now}.
@@ -332,4 +343,6 @@ public class Instance {
return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
numberOfSlots, (taskManager != null ? taskManager.path() : "ActorRef.noSender"));
}
+
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 2ee41da..c1800bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -49,13 +49,13 @@ public class InstanceManager {
/** Set of hosts known to run a task manager that are thus able to execute tasks (by connection). */
private final Map<ActorRef, Instance> registeredHostsByConnection;
-
+
/** Set of hosts that were present once and have died */
private final Set<ActorRef> deadHosts;
-
+
/** Listeners that want to be notified about availability and disappearance of instances */
private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
-
+
/** The total number of task slots that the system has */
private int totalNumberOfAliveTaskSlots;
@@ -65,7 +65,7 @@ public class InstanceManager {
// ------------------------------------------------------------------------
// Constructor and set-up
// ------------------------------------------------------------------------
-
+
/**
* Creates an new instance manager.
*/
@@ -85,7 +85,7 @@ public class InstanceManager {
for (Instance i : this.registeredHostsById.values()) {
i.markDead();
}
-
+
this.registeredHostsById.clear();
this.registeredHostsByConnection.clear();
this.deadHosts.clear();
@@ -93,16 +93,16 @@ public class InstanceManager {
}
}
- public boolean reportHeartBeat(InstanceID instanceId) {
+ public boolean reportHeartBeat(InstanceID instanceId, byte[] lastMetricsReport) {
if (instanceId == null) {
throw new IllegalArgumentException("InstanceID may not be null.");
}
-
+
synchronized (this.lock) {
if (this.isShutdown) {
return false;
}
-
+
Instance host = registeredHostsById.get(instanceId);
if (host == null){
@@ -115,6 +115,7 @@ public class InstanceManager {
}
host.reportHeartBeat();
+ host.setMetricsReport(lastMetricsReport);
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat from TaskManager " + host);
@@ -124,20 +125,19 @@ public class InstanceManager {
}
}
- public InstanceID registerTaskManager(ActorRef taskManager, InstanceConnectionInfo connectionInfo,
- HardwareDescription resources, int numberOfSlots){
+ public InstanceID registerTaskManager(ActorRef taskManager, InstanceConnectionInfo connectionInfo, HardwareDescription resources, int numberOfSlots){
synchronized(this.lock){
if (this.isShutdown) {
throw new IllegalStateException("InstanceManager is shut down.");
}
-
+
Instance prior = registeredHostsByConnection.get(taskManager);
if (prior != null) {
LOG.info("Registration attempt from TaskManager at " + taskManager.path() +
". This connection is already registered under ID " + prior.getId());
return null;
}
-
+
boolean wasDead = this.deadHosts.remove(taskManager);
if (wasDead) {
LOG.info("Registering TaskManager at " + taskManager.path() +
@@ -148,25 +148,25 @@ public class InstanceManager {
do {
id = new InstanceID();
} while (registeredHostsById.containsKey(id));
-
-
+
+
Instance host = new Instance(taskManager, connectionInfo, id, resources, numberOfSlots);
-
+
registeredHostsById.put(id, host);
registeredHostsByConnection.put(taskManager, host);
-
+
totalNumberOfAliveTaskSlots += numberOfSlots;
-
+
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Registered TaskManager at %s (%s) as %s. Current number of registered hosts is %d.",
connectionInfo.getHostname(), taskManager.path(), id, registeredHostsById.size()));
}
host.reportHeartBeat();
-
+
// notify all listeners (for example the scheduler)
notifyNewInstance(host);
-
+
return id;
}
}
@@ -202,7 +202,7 @@ public class InstanceManager {
public int getTotalNumberOfSlots() {
return this.totalNumberOfAliveTaskSlots;
}
-
+
public Collection<Instance> getAllRegisteredInstances() {
synchronized (this.lock) {
// return a copy (rather than a Collections.unmodifiable(...) wrapper), such that
@@ -218,21 +218,21 @@ public class InstanceManager {
public Instance getRegisteredInstance(ActorRef ref) {
return registeredHostsByConnection.get(ref);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
public void addInstanceListener(InstanceListener listener) {
synchronized (this.instanceListeners) {
this.instanceListeners.add(listener);
}
}
-
+
public void removeInstanceListener(InstanceListener listener) {
synchronized (this.instanceListeners) {
this.instanceListeners.remove(listener);
}
}
-
+
private void notifyNewInstance(Instance instance) {
synchronized (this.instanceListeners) {
for (InstanceListener listener : this.instanceListeners) {
@@ -245,7 +245,7 @@ public class InstanceManager {
}
}
}
-
+
private void notifyDeadInstance(Instance instance) {
synchronized (this.instanceListeners) {
for (InstanceListener listener : this.instanceListeners) {
@@ -257,4 +257,4 @@ public class InstanceManager {
}
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index 1610b97..4e028d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -157,6 +157,10 @@ public class SetupInfoServlet extends HttpServlet {
objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
objInner.put("instanceID", instance.getId());
+ byte[] report = instance.getLastMetricsReport();
+ if(report != null) {
+ objInner.put("metrics", new JSONObject(new String(report, "utf-8")));
+ }
array.put(objInner);
} catch (JSONException e) {
LOG.warn("Json object creation failed", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/log4j.properties b/flink-runtime/src/main/resources/log4j.properties
new file mode 100644
index 0000000..9912b19
--- /dev/null
+++ b/flink-runtime/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+# Convenience file for local debugging of the JobManager/TaskManager.
+log4j.rootLogger=OFF, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html b/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
index 998542e..2f72bd6 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
@@ -46,8 +46,6 @@ under the License.
<script src="js/bootstrap.js"></script>
<!-- Scripts from Flink -->
- <script type="text/javascript" src="js/jquery.flot.min.js"></script>
- <script type="text/javascript" src="js/helpers.js"></script>
<script type="text/javascript" src="js/jcanvas.min.js"></script>
<script type="text/javascript" src="js/timeline.js"></script>
<script type="text/javascript" src="js/helpers.js"></script>
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/resources/web-docs-infoserver/blank-page.html
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/blank-page.html b/flink-runtime/src/main/resources/web-docs-infoserver/blank-page.html
index 855f6ab..60315b6 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/blank-page.html
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/blank-page.html
@@ -41,7 +41,6 @@ under the License.
<script src="js/bootstrap.js"></script>
<!-- Scripts from Flink -->
- <script type="text/javascript" src="js/jquery.flot.min.js"></script>
<script type="text/javascript" src="js/helpers.js"></script>
<script type="text/javascript" src="js/jcanvas.min.js"></script>
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/resources/web-docs-infoserver/configuration.html
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/configuration.html b/flink-runtime/src/main/resources/web-docs-infoserver/configuration.html
index 90d2cfc..480aea4 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/configuration.html
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/configuration.html
@@ -41,7 +41,6 @@ under the License.
<script src="js/bootstrap.js"></script>
<!-- Scripts from Flink -->
- <script type="text/javascript" src="js/jquery.flot.min.js"></script>
<script type="text/javascript" src="js/helpers.js"></script>
<script type="text/javascript" src="js/configuration.js"></script>
<script type="text/javascript" src="js/jcanvas.min.js"></script>