You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/03/27 11:39:26 UTC

[9/9] flink git commit: [FLINK-1501] Add metrics library for monitoring TaskManagers

[FLINK-1501] Add metrics library for monitoring TaskManagers

This closes #421


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d1f8b07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d1f8b07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d1f8b07

Branch: refs/heads/master
Commit: 2d1f8b07cc45359e290160cbc076a86229bcd158
Parents: 6b9cee3
Author: Robert Metzger <rm...@apache.org>
Authored: Sat Feb 7 11:33:31 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Mar 27 11:38:10 2015 +0100

----------------------------------------------------------------------
 LICENSE                                         |    1 +
 .../flink/configuration/ConfigConstants.java    |    3 +
 flink-runtime/pom.xml                           |   21 +
 .../apache/flink/runtime/instance/Instance.java |   13 +
 .../flink/runtime/instance/InstanceManager.java |   52 +-
 .../jobmanager/web/SetupInfoServlet.java        |    4 +
 .../src/main/resources/log4j.properties         |   24 +
 .../resources/web-docs-infoserver/analyze.html  |    2 -
 .../web-docs-infoserver/blank-page.html         |    1 -
 .../web-docs-infoserver/configuration.html      |    1 -
 .../web-docs-infoserver/css/bootstrap.css.map   |    1 +
 .../web-docs-infoserver/css/rickshaw.min.css    |    1 +
 .../resources/web-docs-infoserver/history.html  |    1 -
 .../resources/web-docs-infoserver/index.html    |    1 -
 .../web-docs-infoserver/js/bootstrap.min.js     |    6 -
 .../web-docs-infoserver/js/d3.layout.min.js     |    1 +
 .../resources/web-docs-infoserver/js/d3.min.js  |    2 +
 .../web-docs-infoserver/js/flot/jquery.flot.js  | 2599 ------------------
 .../js/flot/jquery.flot.resize.js               |   60 -
 .../js/flot/jquery.flot.tooltip.min.js          |   12 -
 .../js/jquery.flot.categories.min.js            |   44 -
 .../web-docs-infoserver/js/jquery.flot.min.js   |   29 -
 .../web-docs-infoserver/js/jquery.flot.stack.js |  188 --
 .../js/jquery.flot.time.min.js                  |    9 -
 .../resources/web-docs-infoserver/js/jquery.js  |  154 --
 .../web-docs-infoserver/js/rickshaw.min.js      |    3 +
 .../web-docs-infoserver/js/taskmanager.js       |  340 ++-
 .../web-docs-infoserver/taskmanagers.html       |   71 +-
 .../flink/runtime/jobmanager/JobManager.scala   |    4 +-
 .../runtime/messages/TaskManagerMessages.scala  |    3 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   46 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   28 +-
 .../runtime/instance/InstanceManagerTest.java   |   12 +-
 .../flink/test/util/AbstractTestBase.java       |    2 +-
 .../test/util/MultipleProgramsTestBase.java     |    4 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   50 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   12 +-
 .../flink/test/web/WebFrontendITCase.java       |  114 +
 .../src/test/resources/log4j-test.properties    |    2 +-
 flink-yarn-tests/pom.xml                        |    5 +
 .../flink/yarn/YARNSessionFIFOITCase.java       |   36 +-
 pom.xml                                         |    5 +
 42 files changed, 733 insertions(+), 3234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 85d0d85..dc81413 100644
--- a/LICENSE
+++ b/LICENSE
@@ -228,6 +228,7 @@ The Apache Flink project bundles the following files under the MIT License:
  - normalize.css v3.0.0 (http://git.io/normalize) - Copyright (c) Nicolas Gallagher and Jonathan Neal
  - Font Awesome - Code (http://fortawesome.github.io/Font-Awesome/) - Copyright (c) 2014 Dave Gandy
  - D3 dagre renderer (https://github.com/cpettitt/dagre-d3) - Copyright (c) 2012-2013 Chris Pettitt
+ - Rickshaw (https://github.com/shutterstock/rickshaw) - Copyright (C) 2011-2013 by Shutterstock Images, LLC
 
 All rights reserved.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 44f146d..ace2ae5 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -615,6 +615,9 @@ public final class ConfigConstants {
 	 * Sets the number of local task managers
 	 */
 	public static final String LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER = "localinstancemanager.numtaskmanager";
+
+
+	public static final String LOCAL_INSTANCE_MANAGER_START_WEBSERVER = "localinstancemanager.start-webserver";
 	
 	// ------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 4654c60..05f22a5 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -34,6 +34,10 @@ under the License.
 
 	<packaging>jar</packaging>
 
+    <properties>
+        <metrics.version>3.1.0</metrics.version>
+    </properties>
+
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -146,6 +150,23 @@ under the License.
 			</exclusions>
 		</dependency>
 
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>${metrics.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-jvm</artifactId>
+            <version>${metrics.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-json</artifactId>
+            <version>${metrics.version}</version>
+        </dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index d921ff4..e27b7ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -66,10 +66,13 @@ public class Instance {
 	
 	/** Time when last heat beat has been received from the task manager running on this taskManager. */
 	private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
+
+	private byte[] lastMetricsReport;
 	
 	/** Flag marking the instance as alive or as dead. */
 	private volatile boolean isDead;
 
+
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -170,6 +173,14 @@ public class Instance {
 		this.lastReceivedHeartBeat = System.currentTimeMillis();
 	}
 
+	public void setMetricsReport(byte[] lastMetricsReport) {
+		this.lastMetricsReport = lastMetricsReport;
+	}
+
+	public byte[] getLastMetricsReport() {
+		return lastMetricsReport;
+	}
+
 	/**
 	 * Checks whether the last heartbeat occurred within the last {@code n} milliseconds
 	 * before the given timestamp {@code now}.
@@ -332,4 +343,6 @@ public class Instance {
 		return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
 				numberOfSlots, (taskManager != null ? taskManager.path() : "ActorRef.noSender"));
 	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 2ee41da..c1800bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -49,13 +49,13 @@ public class InstanceManager {
 
 	/** Set of hosts known to run a task manager that are thus able to execute tasks (by connection). */
 	private final Map<ActorRef, Instance> registeredHostsByConnection;
-	
+
 	/** Set of hosts that were present once and have died */
 	private final Set<ActorRef> deadHosts;
-	
+
 	/** Listeners that want to be notified about availability and disappearance of instances */
 	private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
-	
+
 	/** The total number of task slots that the system has */
 	private int totalNumberOfAliveTaskSlots;
 
@@ -65,7 +65,7 @@ public class InstanceManager {
 	// ------------------------------------------------------------------------
 	// Constructor and set-up
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates an new instance manager.
 	 */
@@ -85,7 +85,7 @@ public class InstanceManager {
 			for (Instance i : this.registeredHostsById.values()) {
 				i.markDead();
 			}
-			
+
 			this.registeredHostsById.clear();
 			this.registeredHostsByConnection.clear();
 			this.deadHosts.clear();
@@ -93,16 +93,16 @@ public class InstanceManager {
 		}
 	}
 
-	public boolean reportHeartBeat(InstanceID instanceId) {
+	public boolean reportHeartBeat(InstanceID instanceId, byte[] lastMetricsReport) {
 		if (instanceId == null) {
 			throw new IllegalArgumentException("InstanceID may not be null.");
 		}
-		
+
 		synchronized (this.lock) {
 			if (this.isShutdown) {
 				return false;
 			}
-			
+
 			Instance host = registeredHostsById.get(instanceId);
 
 			if (host == null){
@@ -115,6 +115,7 @@ public class InstanceManager {
 			}
 
 			host.reportHeartBeat();
+			host.setMetricsReport(lastMetricsReport);
 
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Received heartbeat from TaskManager " + host);
@@ -124,20 +125,19 @@ public class InstanceManager {
 		}
 	}
 
-	public InstanceID registerTaskManager(ActorRef taskManager, InstanceConnectionInfo connectionInfo,
-										HardwareDescription resources, int numberOfSlots){
+	public InstanceID registerTaskManager(ActorRef taskManager, InstanceConnectionInfo connectionInfo, HardwareDescription resources, int numberOfSlots){
 		synchronized(this.lock){
 			if (this.isShutdown) {
 				throw new IllegalStateException("InstanceManager is shut down.");
 			}
-			
+
 			Instance prior = registeredHostsByConnection.get(taskManager);
 			if (prior != null) {
 				LOG.info("Registration attempt from TaskManager at " + taskManager.path() +
 						". This connection is already registered under ID " + prior.getId());
 				return null;
 			}
-			
+
 			boolean wasDead = this.deadHosts.remove(taskManager);
 			if (wasDead) {
 				LOG.info("Registering TaskManager at " + taskManager.path() +
@@ -148,25 +148,25 @@ public class InstanceManager {
 			do {
 				id = new InstanceID();
 			} while (registeredHostsById.containsKey(id));
-			
-			
+
+
 			Instance host = new Instance(taskManager, connectionInfo, id, resources, numberOfSlots);
-			
+
 			registeredHostsById.put(id, host);
 			registeredHostsByConnection.put(taskManager, host);
-			
+
 			totalNumberOfAliveTaskSlots += numberOfSlots;
-			
+
 			if (LOG.isInfoEnabled()) {
 				LOG.info(String.format("Registered TaskManager at %s (%s) as %s. Current number of registered hosts is %d.",
 						connectionInfo.getHostname(), taskManager.path(), id, registeredHostsById.size()));
 			}
 
 			host.reportHeartBeat();
-			
+
 			// notify all listeners (for example the scheduler)
 			notifyNewInstance(host);
-			
+
 			return id;
 		}
 	}
@@ -202,7 +202,7 @@ public class InstanceManager {
 	public int getTotalNumberOfSlots() {
 		return this.totalNumberOfAliveTaskSlots;
 	}
-	
+
 	public Collection<Instance> getAllRegisteredInstances() {
 		synchronized (this.lock) {
 			// return a copy (rather than a Collections.unmodifiable(...) wrapper), such that
@@ -218,21 +218,21 @@ public class InstanceManager {
 	public Instance getRegisteredInstance(ActorRef ref) {
 		return registeredHostsByConnection.get(ref);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public void addInstanceListener(InstanceListener listener) {
 		synchronized (this.instanceListeners) {
 			this.instanceListeners.add(listener);
 		}
 	}
-	
+
 	public void removeInstanceListener(InstanceListener listener) {
 		synchronized (this.instanceListeners) {
 			this.instanceListeners.remove(listener);
 		}
 	}
-	
+
 	private void notifyNewInstance(Instance instance) {
 		synchronized (this.instanceListeners) {
 			for (InstanceListener listener : this.instanceListeners) {
@@ -245,7 +245,7 @@ public class InstanceManager {
 			}
 		}
 	}
-	
+
 	private void notifyDeadInstance(Instance instance) {
 		synchronized (this.instanceListeners) {
 			for (InstanceListener listener : this.instanceListeners) {
@@ -257,4 +257,4 @@ public class InstanceManager {
 			}
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index 1610b97..4e028d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -157,6 +157,10 @@ public class SetupInfoServlet extends HttpServlet {
 					objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
 					objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
 					objInner.put("instanceID", instance.getId());
+					byte[] report = instance.getLastMetricsReport();
+					if(report != null) {
+						objInner.put("metrics", new JSONObject(new String(report, "utf-8")));
+					}
 					array.put(objInner);
 				} catch (JSONException e) {
 					LOG.warn("Json object creation failed", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/log4j.properties b/flink-runtime/src/main/resources/log4j.properties
new file mode 100644
index 0000000..9912b19
--- /dev/null
+++ b/flink-runtime/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+# Convenience file for local debugging of the JobManager/TaskManager.
+log4j.rootLogger=OFF, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html b/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
index 998542e..2f72bd6 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
@@ -46,8 +46,6 @@ under the License.
     <script src="js/bootstrap.js"></script>
     
     <!-- Scripts from Flink -->
-	<script type="text/javascript" src="js/jquery.flot.min.js"></script>
-	<script type="text/javascript" src="js/helpers.js"></script>
 	<script type="text/javascript" src="js/jcanvas.min.js"></script>
     <script type="text/javascript" src="js/timeline.js"></script>
     <script type="text/javascript" src="js/helpers.js"></script>

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/resources/web-docs-infoserver/blank-page.html
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/blank-page.html b/flink-runtime/src/main/resources/web-docs-infoserver/blank-page.html
index 855f6ab..60315b6 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/blank-page.html
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/blank-page.html
@@ -41,7 +41,6 @@ under the License.
     <script src="js/bootstrap.js"></script>
     
     <!-- Scripts from Flink -->
-	<script type="text/javascript" src="js/jquery.flot.min.js"></script>
 	<script type="text/javascript" src="js/helpers.js"></script>
 	<script type="text/javascript" src="js/jcanvas.min.js"></script>
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1f8b07/flink-runtime/src/main/resources/web-docs-infoserver/configuration.html
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/configuration.html b/flink-runtime/src/main/resources/web-docs-infoserver/configuration.html
index 90d2cfc..480aea4 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/configuration.html
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/configuration.html
@@ -41,7 +41,6 @@ under the License.
     <script src="js/bootstrap.js"></script>
     
     <!-- Scripts from Flink -->
-	<script type="text/javascript" src="js/jquery.flot.min.js"></script>
 	<script type="text/javascript" src="js/helpers.js"></script>
 	<script type="text/javascript" src="js/configuration.js"></script>
 	<script type="text/javascript" src="js/jcanvas.min.js"></script>