You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/11/04 20:58:33 UTC

[5/5] flink git commit: [FLINK-4221] Show metrics in WebFrontend + general improvements

[FLINK-4221] Show metrics in WebFrontend + general improvements

Other included changes:
- Removed Properties tab
- Renamed plan to overview
- Added parallelism to task list

This closes #2724


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a4fc537
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a4fc537
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a4fc537

Branch: refs/heads/master
Commit: 3a4fc537dcff4961d4b0c7d11347d246d5b4d293
Parents: e480762
Author: Piotr Godek <pi...@gmail.com>
Authored: Tue Apr 19 15:19:16 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Nov 4 20:41:37 2016 +0100

----------------------------------------------------------------------
 .gitignore                                      |     2 +
 NOTICE                                          |     2 +-
 .../examples/socket/SocketWindowWordCount.java  |    24 +-
 .../web-dashboard/app/partials/jobs/job.jade    |     6 +-
 .../app/partials/jobs/job.plan.jade             |     3 +
 .../jobs/job.plan.node-list.metrics.jade        |    47 +
 .../jobs/job.plan.node-list.subtasks.jade       |     4 +-
 .../jobs/job.plan.node-list.taskmanagers.jade   |     4 +-
 .../app/partials/jobs/job.properties.jade       |   118 -
 .../web-dashboard/app/scripts/index.coffee      |    16 +-
 .../app/scripts/modules/jobs/jobs.ctrl.coffee   |   157 +-
 .../app/scripts/modules/jobs/jobs.dir.coffee    |    18 +-
 .../app/scripts/modules/jobs/metrics.dir.coffee |   138 +
 .../app/scripts/modules/jobs/metrics.svc.coffee |   206 +
 .../web-dashboard/app/styles/index.styl         |     1 +
 .../web-dashboard/app/styles/metric.styl        |   103 +
 flink-runtime-web/web-dashboard/bower.json      |     5 +-
 .../web-dashboard/web/css/index.css             |    89 +
 .../web-dashboard/web/css/vendor.css            |   677 +-
 flink-runtime-web/web-dashboard/web/js/index.js |   669 +-
 .../web-dashboard/web/js/vendor.js              | 17086 ++++++++++++++++-
 .../web-dashboard/web/partials/jobs/job.html    |     3 +-
 .../web/partials/jobs/job.plan.html             |     1 +
 .../jobs/job.plan.node-list.metrics.html        |    45 +
 .../jobs/job.plan.node-list.overview.html       |    60 -
 .../jobs/job.plan.node-list.subtasks.html       |     4 +-
 .../jobs/job.plan.node-list.taskmanagers.html   |     4 +-
 .../api/graph/StreamingJobGraphGenerator.java   |    10 +-
 28 files changed, 18385 insertions(+), 1117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 9985ffb..e18629d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,5 @@ out/
 /docs/Gemfile.lock
 /docs/.bundle
 /docs/.rubydeps
+*.ipr
+*.iws

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 613c660..b68ef69 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
 Apache Flink
-Copyright 2014-2015 The Apache Software Foundation
+Copyright 2014-2016 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
index d6cbe87..fe1b6e7 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
@@ -30,7 +30,7 @@ import org.apache.flink.util.Collector;
  * Implements a streaming windowed version of the "WordCount" program.
  *
  * This program connects to a server socket and reads strings from the socket.
- * The easiest way to try this out is to open a text sever (at port 12345) 
+ * The easiest way to try this out is to open a text server (at port 12345)
  * using the <i>netcat</i> tool via
  * <pre>
  * nc -l 12345
@@ -39,7 +39,7 @@ import org.apache.flink.util.Collector;
  */
 @SuppressWarnings("serial")
 public class SocketWindowWordCount {
-	
+
 	public static void main(String[] args) throws Exception {
 
 		// the port to connect to
@@ -53,17 +53,17 @@ public class SocketWindowWordCount {
 			System.err.println("To start a simple text server, run 'netcat -l <port>' and type the input text " +
 					"into the command line");
 			return;
-		} 
-		
+		}
+
 		// get the execution environment
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		// get input data by connecting to the socket
 		DataStream<String> text = env.socketTextStream("localhost", port, "\n");
 
-		// parse the data, group it, window it, and aggregate the counts 
+		// parse the data, group it, window it, and aggregate the counts
 		DataStream<WordWithCount> windowCounts = text
-				
+
 				.flatMap(new FlatMapFunction<String, WordWithCount>() {
 					@Override
 					public void flatMap(String value, Collector<WordWithCount> out) {
@@ -72,10 +72,10 @@ public class SocketWindowWordCount {
 						}
 					}
 				})
-				
+
 				.keyBy("word")
 				.timeWindow(Time.seconds(5), Time.seconds(1))
-				
+
 				.reduce(new ReduceFunction<WordWithCount>() {
 					@Override
 					public WordWithCount reduce(WordWithCount a, WordWithCount b) {
@@ -88,17 +88,17 @@ public class SocketWindowWordCount {
 
 		env.execute("Socket Window WordCount");
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Data type for words with count
 	 */
 	public static class WordWithCount {
-		
+
 		public String word;
 		public long count;
-		
+
 		public WordWithCount() {}
 
 		public WordWithCount(String word, long count) {
@@ -111,4 +111,4 @@ public class SocketWindowWordCount {
 			return word + " : " + count;
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
index d80beee..d8b4d4a 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
@@ -50,9 +50,8 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
 nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job")
   ul.nav.nav-tabs
     li(ui-sref-active='active')
-      a(ui-sref=".plan") Plan
+      a(ui-sref=".plan") Overview
 
-    //- li(ui-sref-active='active' ng-if="job['end-time'] > -1")
     li(ui-sref-active='active')
       a(ui-sref=".timeline") Timeline
 
@@ -60,9 +59,6 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job")
       a(ui-sref=".exceptions") Exceptions
 
     li(ui-sref-active='active')
-      a(ui-sref=".properties") Properties
-
-    li(ui-sref-active='active')
       a(ui-sref=".config") Configuration
 
 #content-inner.has-navbar-main-additional

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
index d0576e7..646fdf0 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
@@ -25,6 +25,9 @@
         a(ui-sref=".subtasks({nodeid: nodeid})") Subtasks
 
       li(ui-sref-active='active')
+        a(ui-sref=".metrics({nodeid: nodeid})") Metrics
+
+      li(ui-sref-active='active')
         a(ui-sref=".taskmanagers({nodeid: nodeid})") TaskManagers
 
       li(ui-sref-active='active')

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.metrics.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.metrics.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.metrics.jade
new file mode 100644
index 0000000..18efbe9
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.metrics.jade
@@ -0,0 +1,47 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+p.p-info(ng-if="!nodeid")
+  em Select operator in the job graph above to see the metrics
+
+nav.navbar.navbar-default.navbar-secondary-additional.navbar-secondary-additional-2(ng-if="nodeid")
+  .navbar-info {{ vertex.name }}
+  .navbar-info {{ nodeid }}
+
+  .dropup.add-metrics(ng-if="availableMetrics.length")
+    button.btn.btn-default.navbar-btn.dropdown-toggle(type='button', data-toggle='dropdown')
+      | Add metric
+      | &nbsp;
+      span.caret
+    ul.dropdown-menu.dropdown-menu-right.metric-menu
+      li(ng-repeat="metric in availableMetrics track by $index")
+        a(ng-click="addMetric(metric)") {{ metric.id }}
+
+  .dropup.add-metrics(ng-if="!availableMetrics.length")
+    button.btn.btn-default.navbar-btn.dropdown-toggle(type='button', data-toggle='dropdown', disabled='disabled')
+      i No metrics available
+
+.row.metric-row(ng-if="nodeid && metrics.length == 0")
+  p.p-info
+    em No metrics selected
+
+ul.metric-row(ng-if="nodeid && metrics.length > 0" dnd-list="metrics" dnd-drop="dropped(event, index, item, external, type, external)")
+  li.metric-col(ng-repeat="metric in metrics track by metric.id" dnd-draggable="metric" dnd-dragstart="dragStart()" dnd-dragend="dragEnd()" dnd-canceled="dragEnd()" ng-class="{big: metric.size != 'small'}")
+    metrics-graph(metric="metric" window="window" get-values="getValues(metric.id)" remove-metric="removeMetric(metric)" set-metric-size="setMetricSize")
+
+.clearfix
+

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
index ef9257d..2067256 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.subtasks.jade
@@ -26,6 +26,7 @@ table.table.table-body-hover.table-clickable.table-activable
       th Records received
       th Bytes sent
       th Records sent
+      th Parallelism
       th Tasks
       th Status
 
@@ -43,6 +44,7 @@ table.table.table-body-hover.table-clickable.table-activable
       td {{ v.metrics['read-records'] | number }}
       td(title="{{v.metrics['write-bytes']}} bytes") {{ v.metrics['write-bytes'] | humanizeBytes }}
       td {{ v.metrics['write-records'] | number }}
+      td {{ v.parallelism }}
       td
         .label-group
           bs-label(status="{{status}}" ng-repeat="(index, status) in stateList") {{v.tasks[status]}}
@@ -50,5 +52,5 @@ table.table.table-body-hover.table-clickable.table-activable
       td 
         bs-label(status="{{v.status}}") {{v.status}}
     tr(ng-if="nodeid && v.id == nodeid")
-      td(colspan="10")
+      td(colspan="11")
         div(ng-include=" 'partials/jobs/job.plan.node.subtasks.html' ")

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.taskmanagers.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.taskmanagers.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.taskmanagers.jade
index 2811461..86e5898 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.taskmanagers.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.taskmanagers.jade
@@ -26,6 +26,7 @@ table.table.table-body-hover.table-clickable.table-activable
       th Records received
       th Bytes sent
       th Records sent
+      th Parallelism
       th Tasks
       th Status
 
@@ -43,6 +44,7 @@ table.table.table-body-hover.table-clickable.table-activable
       td {{ v.metrics['read-records'] | number }}
       td(title="{{v.metrics['write-bytes']}} bytes") {{ v.metrics['write-bytes'] | humanizeBytes }}
       td {{ v.metrics['write-records'] | number }}
+      td {{ v.parallelism }}
       td
         .label-group
           bs-label(status="{{status}}" ng-repeat="(index, status) in stateList") {{v.tasks[status]}}
@@ -50,5 +52,5 @@ table.table.table-body-hover.table-clickable.table-activable
       td
         bs-label(status="{{v.status}}") {{v.status}}
     tr(ng-if="nodeid && v.id == nodeid")
-      td(colspan="10")
+      td(colspan="11")
         div(ng-include=" 'partials/jobs/job.plan.node.taskmanagers.html' ")

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/partials/jobs/job.properties.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.properties.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.properties.jade
deleted file mode 100644
index 76cbc78..0000000
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.properties.jade
+++ /dev/null
@@ -1,118 +0,0 @@
-//
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
- 
-      http://www.apache.org/licenses/LICENSE-2.0
- 
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-
-.canvas-wrapper
-  div.main-canvas(job-plan, plan="plan", jobid="{{jobid}}", set-node="changeNode(nodeid)")
-
-.panel.panel-default(ng-if="node")
-  .panel-heading.clearfix
-    .panel-title
-      | {{ node.description | humanizeText }}
-
-  .panel-body.clean
-    .row
-     .col-sm-6.col-md-4
-       table.table.table-properties(ng-if="node.optimizer_properties.global_properties")
-         thead
-           tr
-             th(colspan="2")
-               | Global Data Properties
-
-         tbody
-           tr(ng-repeat="property in node.optimizer_properties.global_properties")
-             td {{property.name}}
-             td(table-property value="property.value")
-
-       table.table.table-properties(ng-if="node.optimizer_properties.local_properties")
-         thead
-           tr
-             th(colspan="2")
-               | Local Data Properties
-
-         tbody
-           tr(ng-repeat="property in node.optimizer_properties.local_properties")
-             td {{property.name}}
-             td(table-property value="property.value")
-     
-       .visible-xs.visible-sm
-         table.table.table-properties
-           thead
-             tr
-               th(colspan="2")
-                 | Properties
-     
-           tbody
-             tr
-               td Operator
-               td(table-property value="node.operator_strategy")
-     
-             tr
-               td Parallelism
-               td(table-property value="node.parallelism")
-
-
-     .hidden-sm.col-md-4
-       table.table.table-properties
-         thead
-           tr
-             th(colspan="2")
-               | Properties
-
-         tbody
-           tr
-             td Operator
-             td(table-property value="node.operator_strategy")
-
-           tr
-             td Parallelism
-             td(table-property value="node.parallelism")
-
-
-       table.table.table-properties(ng-if="node.optimizer_properties.estimates")
-         thead
-           tr
-             th(colspan="2")
-               | Size Estimates
-
-         tbody
-           tr(ng-repeat="property in node.optimizer_properties.estimates")
-             td {{property.name}}
-             td(table-property value="property.value")
-
-     .col-sm-6.col-md-4
-       .visible-xs.visible-sm
-         table.table.table-properties(ng-if="node.optimizer_properties.estimates")
-           thead
-             tr
-               th(colspan="2")
-                 | Size Estimates
-     
-           tbody
-             tr(ng-repeat="property in node.optimizer_properties.estimates")
-               td {{property.name}}
-               td(table-property value="property.value")
-     
-       table.table.table-properties(ng-if="node.optimizer_properties.costs")
-         thead
-           tr
-             th(colspan="2")
-               | Cost Estimates
-     
-         tbody
-           tr(ng-repeat="property in node.optimizer_properties.costs")
-             td {{property.name}}
-             td(table-property value="property.value")

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index 179e172..ea900d3 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -16,7 +16,7 @@
 # limitations under the License.
 #
 
-angular.module('flinkApp', ['ui.router', 'angularMoment'])
+angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])
 
 # --------------------------------------
 
@@ -107,6 +107,13 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
         templateUrl: "partials/jobs/job.plan.node-list.subtasks.html"
         controller: 'JobPlanSubtasksController'
 
+  .state "single-job.plan.metrics",
+    url: "/metrics"
+    views:
+      'node-details':
+        templateUrl: "partials/jobs/job.plan.node-list.metrics.html"
+        controller: 'JobPlanMetricsController'
+
   .state "single-job.plan.taskmanagers",
     url: "/taskmanagers"
     views:
@@ -155,13 +162,6 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
         templateUrl: "partials/jobs/job.exceptions.html"
         controller: 'JobExceptionsController'
 
-  .state "single-job.properties",
-    url: "/properties"
-    views:
-      details:
-        templateUrl: "partials/jobs/job.properties.html"
-        controller: 'JobPropertiesController'
-
   .state "single-job.config",
     url: "/config"
     views:

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
index 931976d..5b18377 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
@@ -42,9 +42,7 @@ angular.module('flinkApp')
 
 # --------------------------------------
 
-.controller 'SingleJobController', ($scope, $state, $stateParams, JobsService, $rootScope, flinkConfig, $interval) ->
-  console.log 'SingleJobController'
-
+.controller 'SingleJobController', ($scope, $state, $stateParams, JobsService, MetricsService, $rootScope, flinkConfig, $interval) ->
   $scope.jobid = $stateParams.jobid
   $scope.job = null
   $scope.plan = null
@@ -57,6 +55,7 @@ angular.module('flinkApp')
     $scope.job = data
     $scope.plan = data.plan
     $scope.vertices = data.vertices
+    MetricsService.setupMetrics($stateParams.jobid, data.vertices)
 
   refresher = $interval ->
     JobsService.loadJob($stateParams.jobid).then (data) ->
@@ -91,8 +90,6 @@ angular.module('flinkApp')
 # --------------------------------------
 
 .controller 'JobPlanController', ($scope, $state, $stateParams, JobsService) ->
-  console.log 'JobPlanController'
-
   $scope.nodeid = null
   $scope.nodeUnfolded = false
   $scope.stateList = JobsService.stateList()
@@ -106,6 +103,7 @@ angular.module('flinkApp')
       $scope.operatorCheckpointStats = null
 
       $scope.$broadcast 'reload'
+      $scope.$broadcast 'node:change', $scope.nodeid
 
     else
       $scope.nodeid = null
@@ -129,106 +127,92 @@ angular.module('flinkApp')
 # --------------------------------------
 
 .controller 'JobPlanSubtasksController', ($scope, JobsService) ->
-  console.log 'JobPlanSubtasksController'
-
-  if $scope.nodeid and (!$scope.vertex or !$scope.vertex.st)
+  getSubtasks = ->
     JobsService.getSubtasks($scope.nodeid).then (data) ->
       $scope.subtasks = data
 
+  if $scope.nodeid and (!$scope.vertex or !$scope.vertex.st)
+    getSubtasks()
+
   $scope.$on 'reload', (event) ->
-    console.log 'JobPlanSubtasksController'
-    if $scope.nodeid
-      JobsService.getSubtasks($scope.nodeid).then (data) ->
-        $scope.subtasks = data
+    getSubtasks() if $scope.nodeid
 
 # --------------------------------------
 
 .controller 'JobPlanTaskManagersController', ($scope, JobsService) ->
-  console.log 'JobPlanTaskManagersController'
-
-  if $scope.nodeid and (!$scope.vertex or !$scope.vertex.st)
+  getTaskManagers = ->
     JobsService.getTaskManagers($scope.nodeid).then (data) ->
       $scope.taskmanagers = data
 
+  if $scope.nodeid and (!$scope.vertex or !$scope.vertex.st)
+    getTaskManagers()
+
   $scope.$on 'reload', (event) ->
-    console.log 'JobPlanTaskManagersController'
-    if $scope.nodeid
-      JobsService.getTaskManagers($scope.nodeid).then (data) ->
-        $scope.taskmanagers = data
+    getTaskManagers() if $scope.nodeid
 
 # --------------------------------------
 
 .controller 'JobPlanAccumulatorsController', ($scope, JobsService) ->
-  console.log 'JobPlanAccumulatorsController'
-
-  if $scope.nodeid and (!$scope.vertex or !$scope.vertex.accumulators)
+  getAccumulators = ->
     JobsService.getAccumulators($scope.nodeid).then (data) ->
       $scope.accumulators = data.main
       $scope.subtaskAccumulators = data.subtasks
 
+  if $scope.nodeid and (!$scope.vertex or !$scope.vertex.accumulators)
+    getAccumulators()
+
   $scope.$on 'reload', (event) ->
-    console.log 'JobPlanAccumulatorsController'
-    if $scope.nodeid
-      JobsService.getAccumulators($scope.nodeid).then (data) ->
-        $scope.accumulators = data.main
-        $scope.subtaskAccumulators = data.subtasks
+    getAccumulators() if $scope.nodeid
 
 # --------------------------------------
 
 .controller 'JobPlanCheckpointsController', ($scope, JobsService) ->
-  console.log 'JobPlanCheckpointsController'
-
-  # Get the per job stats
-  JobsService.getJobCheckpointStats($scope.jobid).then (data) ->
-    $scope.jobCheckpointStats = data
+  getJobCheckpointStats = ->
+    JobsService.getJobCheckpointStats($scope.jobid).then (data) ->
+      $scope.jobCheckpointStats = data
 
-  # Get the per operator stats
-  if $scope.nodeid and (!$scope.vertex or !$scope.vertex.operatorCheckpointStats)
+  getOperatorCheckpointStats = ->
     JobsService.getOperatorCheckpointStats($scope.nodeid).then (data) ->
       $scope.operatorCheckpointStats = data.operatorStats
       $scope.subtasksCheckpointStats = data.subtasksStats
 
-  $scope.$on 'reload', (event) ->
-    console.log 'JobPlanCheckpointsController'
+  # Get the per job stats
+  getJobCheckpointStats()
 
-    JobsService.getJobCheckpointStats($scope.jobid).then (data) ->
-      $scope.jobCheckpointStats = data
+  # Get the per operator stats
+  if $scope.nodeid and (!$scope.vertex or !$scope.vertex.operatorCheckpointStats)
+    getOperatorCheckpointStats()
 
-    if $scope.nodeid
-      JobsService.getOperatorCheckpointStats($scope.nodeid).then (data) ->
-        $scope.operatorCheckpointStats = data.operatorStats
-        $scope.subtasksCheckpointStats = data.subtasksStats
+  $scope.$on 'reload', (event) ->
+    getJobCheckpointStats()
+    getOperatorCheckpointStats() if $scope.nodeid
 
 # --------------------------------------
 
 .controller 'JobPlanBackPressureController', ($scope, JobsService) ->
-  console.log 'JobPlanBackPressureController'
-  $scope.now = Date.now()
-
-  if $scope.nodeid
-    JobsService.getOperatorBackPressure($scope.nodeid).then (data) ->
-      $scope.backPressureOperatorStats[$scope.nodeid] = data
-
-  $scope.$on 'reload', (event) ->
-    console.log 'JobPlanBackPressureController (relaod)'
+  getOperatorBackPressure = ->
     $scope.now = Date.now()
 
     if $scope.nodeid
       JobsService.getOperatorBackPressure($scope.nodeid).then (data) ->
         $scope.backPressureOperatorStats[$scope.nodeid] = data
 
+  getOperatorBackPressure()
+
+  $scope.$on 'reload', (event) ->
+    getOperatorBackPressure()
+
 # --------------------------------------
 
 .controller 'JobTimelineVertexController', ($scope, $state, $stateParams, JobsService) ->
-  console.log 'JobTimelineVertexController'
+  getVertex = ->
+    JobsService.getVertex($stateParams.vertexId).then (data) ->
+      $scope.vertex = data
 
-  JobsService.getVertex($stateParams.vertexId).then (data) ->
-    $scope.vertex = data
+  getVertex()
 
   $scope.$on 'reload', (event) ->
-    console.log 'JobTimelineVertexController'
-    JobsService.getVertex($stateParams.vertexId).then (data) ->
-      $scope.vertex = data
+    getVertex()
 
 # --------------------------------------
 
@@ -239,8 +223,6 @@ angular.module('flinkApp')
 # --------------------------------------
 
 .controller 'JobPropertiesController', ($scope, JobsService) ->
-  console.log 'JobPropertiesController'
-
   $scope.changeNode = (nodeid) ->
     if nodeid != $scope.nodeid
       $scope.nodeid = nodeid
@@ -251,3 +233,60 @@ angular.module('flinkApp')
     else
       $scope.nodeid = null
       $scope.node = null
+
+# --------------------------------------
+
+.controller 'JobPlanMetricsController', ($scope, JobsService, MetricsService) ->
+  $scope.dragging = false
+  $scope.window = MetricsService.getWindow()
+  $scope.availableMetrics = null
+
+  $scope.$on '$destroy', ->
+    MetricsService.unRegisterObserver()
+
+  loadMetrics = ->
+    JobsService.getVertex($scope.nodeid).then (data) ->
+      $scope.vertex = data
+
+    MetricsService.getAvailableMetrics($scope.jobid, $scope.nodeid).then (data) ->
+      $scope.availableMetrics = data
+      $scope.metrics = MetricsService.getMetricsSetup($scope.jobid, $scope.nodeid).names
+
+      MetricsService.registerObserver($scope.jobid, $scope.nodeid, (data) ->
+        $scope.$broadcast "metrics:data:update", data.timestamp, data.values
+      )
+
+  $scope.dropped = (event, index, item, external, type) ->
+
+    MetricsService.orderMetrics($scope.jobid, $scope.nodeid, item, index)
+    $scope.$broadcast "metrics:refresh", item
+    loadMetrics()
+    false
+
+  $scope.dragStart = ->
+    $scope.dragging = true
+
+  $scope.dragEnd = ->
+    $scope.dragging = false
+
+  $scope.addMetric = (metric) ->
+    MetricsService.addMetric($scope.jobid, $scope.nodeid, metric.id)
+    loadMetrics()
+
+  $scope.removeMetric = (metric) ->
+    MetricsService.removeMetric($scope.jobid, $scope.nodeid, metric)
+    loadMetrics()
+
+  $scope.setMetricSize = (metric, size) ->
+    MetricsService.setMetricSize($scope.jobid, $scope.nodeid, metric, size)
+    loadMetrics()
+
+  $scope.getValues = (metric) ->
+    MetricsService.getValues($scope.jobid, $scope.nodeid, metric)
+
+  $scope.$on 'node:change', (event, nodeid) ->
+    loadMetrics() if !$scope.dragging
+
+  loadMetrics() if $scope.nodeid
+
+# --------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
index f6314ee..5e6b0e2 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
@@ -252,7 +252,7 @@ angular.module('flinkApp')
         'node-iteration'
 
       else
-          'node-normal'
+        'node-normal'
       
     # creates the label of a node, in info is stored, whether it is a special node (like a mirror in an iteration)
     createLabelNode = (el, info, maxW, maxH) ->
@@ -352,18 +352,10 @@ angular.module('flinkApp')
           class: getNodeType(el, "")
 
     createEdge = (g, data, el, existingNodes, pred) ->
-      unless existingNodes.indexOf(pred.id) is -1
-        g.setEdge pred.id, el.id,
-          label: createLabelEdge(pred)
-          labelType: 'html'
-          arrowhead: 'normal'
-
-      else
-        missingNode = searchForNode(data, pred.id)
-        unless !missingNode
-          g.setEdge missingNode.id, el.id,
-            label: createLabelEdge(missingNode)
-            labelType: 'html'
+      g.setEdge pred.id, el.id,
+        label: createLabelEdge(pred)
+        labelType: 'html'
+        arrowhead: 'normal'
 
     loadJsonToDagre = (g, data) ->
       existingNodes = []

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/metrics.dir.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/metrics.dir.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/metrics.dir.coffee
new file mode 100644
index 0000000..adfc09f
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/metrics.dir.coffee
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+angular.module('flinkApp')
+
+# ----------------------------------------------
+
+.directive 'metricsGraph', ->
+  template: '<div class="panel panel-default panel-metric">
+               <div class="panel-heading">
+                 <span class="metric-title">{{metric.id}}</span>
+                 <div class="buttons">
+                   <div class="btn-group">
+                     <button type="button" ng-class="[btnClasses, {active: metric.size != \'big\'}]" ng-click="setSize(\'small\')">Small</button>
+                     <button type="button" ng-class="[btnClasses, {active: metric.size == \'big\'}]" ng-click="setSize(\'big\')">Big</button>
+                   </div>
+                   <a title="Remove" class="btn btn-default btn-xs remove" ng-click="removeMetric()"><i class="fa fa-close" /></a>
+                 </div>
+               </div>
+               <div class="panel-body">
+                  <svg />
+               </div>
+             </div>'
+  replace: true
+  scope:
+    metric: "="
+    window: "="
+    removeMetric: "&"
+    setMetricSize: "="
+    getValues: "&"
+
+  link: (scope, element, attrs) ->
+    scope.btnClasses = ['btn', 'btn-default', 'btn-xs']
+
+    scope.value = null
+    scope.data = [{
+      values: scope.getValues()
+    }]
+
+    scope.options = {
+      x: (d, i) ->
+        d.x
+      y: (d, i) ->
+        d.y
+
+      xTickFormat: (d) ->
+        d3.time.format('%H:%M:%S')(new Date(d))
+
+      yTickFormat: (d) ->
+        found = false
+        pow = 0
+        step = 1
+        absD = Math.abs(d)
+
+        while !found && pow < 50
+          if Math.pow(10, pow) <= absD && absD < Math.pow(10, pow + step)
+            found = true
+          else
+            pow += step
+
+        if found && pow > 6
+          "#{d / Math.pow(10, pow)}E#{pow}"
+        else
+          "#{d}"
+    }
+
+    scope.showChart = ->
+      d3.select(element.find("svg")[0])
+      .datum(scope.data)
+      .transition().duration(250)
+      .call(scope.chart)
+
+    scope.chart = nv.models.lineChart()
+      .options(scope.options)
+      .showLegend(false)
+      .margin({
+        top: 15
+        left: 60
+        bottom: 30
+        right: 30
+      })
+
+    scope.chart.yAxis.showMaxMin(false)
+    scope.chart.tooltip.hideDelay(0)
+    scope.chart.tooltip.contentGenerator((obj) ->
+      "<p>#{d3.time.format('%H:%M:%S')(new Date(obj.point.x))} | #{obj.point.y}</p>"
+    )
+
+    nv.utils.windowResize(scope.chart.update);
+
+    scope.setSize = (size) ->
+      scope.setMetricSize(scope.metric, size)
+
+    scope.showChart()
+
+    scope.$on 'metrics:data:update', (event, timestamp, data) ->
+#      scope.value = parseInt(data[scope.metric.id])
+      scope.value = parseFloat(data[scope.metric.id])
+
+      scope.data[0].values.push {
+        x: timestamp
+        y: scope.value
+      }
+
+      if scope.data[0].values.length > scope.window
+        scope.data[0].values.shift()
+
+      scope.showChart()
+      scope.chart.clearHighlights()
+      scope.chart.tooltip.hidden(true)
+
+    element.find(".metric-title").qtip({
+      content: {
+        text: scope.metric.id
+      },
+      position: {
+        my: 'bottom left',
+        at: 'top left'
+      },
+      style: {
+        classes: 'qtip-light qtip-timeline-bar'
+      }
+    });

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/metrics.svc.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/metrics.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/metrics.svc.coffee
new file mode 100644
index 0000000..e169d6d
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/metrics.svc.coffee
@@ -0,0 +1,206 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+angular.module('flinkApp')
+
+.service 'MetricsService', ($http, $q, flinkConfig, $interval) ->
+  @metrics = {}
+  @values = {}
+  @watched = {}
+  @observer = {
+    jobid: null
+    nodeid: null
+    callback: null
+  }
+
+  @refresh = $interval =>
+    angular.forEach @watched, (v, jobid) =>
+      angular.forEach v, (nodeid, nk) =>
+        @getAllAvailableMetrics(jobid, nodeid).then (data) =>
+          names = []
+          angular.forEach data, (metric, mk) =>
+            names.push metric.id
+
+          if names.length > 0
+            @getMetrics(jobid, nodeid, names).then (values) =>
+              if jobid == @observer.jobid && nodeid == @observer.nodeid
+                @observer.callback(values) if @observer.callback
+
+
+  , flinkConfig["refresh-interval"]
+
+  @registerObserver = (jobid, nodeid, callback) ->
+    @observer.jobid = jobid
+    @observer.nodeid = nodeid
+    @observer.callback = callback
+
+  @unRegisterObserver = ->
+    @observer = {
+      jobid: null
+      nodeid: null
+      callback: null
+    }
+
+  @setupMetrics = (jobid, vertices) ->
+    @setupLS()
+
+    @watched[jobid] = []
+    angular.forEach vertices, (v, k) =>
+      @watched[jobid].push(v.id) if v.id
+
+  @getWindow = ->
+    100
+
+  @setupLS = ->
+    if !localStorage.flinkMetrics?
+      @saveSetup()
+
+    @metrics = JSON.parse(localStorage.flinkMetrics)
+
+  @saveSetup = ->
+    localStorage.flinkMetrics = JSON.stringify(@metrics)
+
+  @saveValue = (jobid, nodeid, value) ->
+    unless @values[jobid]?
+      @values[jobid] = {}
+
+    unless @values[jobid][nodeid]?
+      @values[jobid][nodeid] = []
+
+    @values[jobid][nodeid].push(value)
+
+    if @values[jobid][nodeid].length > @getWindow()
+      @values[jobid][nodeid].shift()
+
+  @getValues = (jobid, nodeid, metricid) ->
+    return [] unless @values[jobid]?
+    return [] unless @values[jobid][nodeid]?
+
+    results = []
+    angular.forEach @values[jobid][nodeid], (v, k) =>
+      if v.values[metricid]?
+        results.push {
+          x: v.timestamp
+          y: v.values[metricid]
+        }
+
+    results
+
+  @setupLSFor = (jobid, nodeid) ->
+    if !@metrics[jobid]?
+      @metrics[jobid] = {}
+
+    if !@metrics[jobid][nodeid]?
+      @metrics[jobid][nodeid] = []
+
+  @addMetric = (jobid, nodeid, metricid) ->
+    @setupLSFor(jobid, nodeid)
+
+    @metrics[jobid][nodeid].push({id: metricid, size: 'small'})
+
+    @saveSetup()
+
+  @removeMetric = (jobid, nodeid, metric) =>
+    if @metrics[jobid][nodeid]?
+      i = @metrics[jobid][nodeid].indexOf(metric)
+      i = _.findIndex(@metrics[jobid][nodeid], { id: metric }) if i == -1
+
+      @metrics[jobid][nodeid].splice(i, 1) if i != -1
+
+      @saveSetup()
+
+  @setMetricSize = (jobid, nodeid, metric, size) =>
+    if @metrics[jobid][nodeid]?
+      i = @metrics[jobid][nodeid].indexOf(metric.id)
+      i = _.findIndex(@metrics[jobid][nodeid], { id: metric.id }) if i == -1
+
+      @metrics[jobid][nodeid][i] = { id: metric.id, size: size } if i != -1
+
+      @saveSetup()
+
+  @orderMetrics = (jobid, nodeid, item, index) ->
+    @setupLSFor(jobid, nodeid)
+
+    angular.forEach @metrics[jobid][nodeid], (v, k) =>
+      if v.id == item.id
+        @metrics[jobid][nodeid].splice(k, 1)
+        if k < index
+          index = index - 1
+
+    @metrics[jobid][nodeid].splice(index, 0, item)
+
+    @saveSetup()
+
+  @getMetricsSetup = (jobid, nodeid) =>
+    {
+      names: _.map(@metrics[jobid][nodeid], (value) =>
+        if _.isString(value) then { id: value, size: "small" } else value
+      )
+    }
+
+  @getAvailableMetrics = (jobid, nodeid) =>
+    @setupLSFor(jobid, nodeid)
+
+    deferred = $q.defer()
+
+    $http.get flinkConfig.jobServer + "jobs/" + jobid + "/vertices/" + nodeid + "/metrics"
+    .success (data) =>
+      results = []
+      angular.forEach data, (v, k) =>
+        i = @metrics[jobid][nodeid].indexOf(v.id)
+        i = _.findIndex(@metrics[jobid][nodeid], { id: v.id }) if i == -1
+
+        if i == -1
+          results.push(v)
+
+      deferred.resolve(results)
+
+    deferred.promise
+
+  @getAllAvailableMetrics = (jobid, nodeid) =>
+    deferred = $q.defer()
+
+    $http.get flinkConfig.jobServer + "jobs/" + jobid + "/vertices/" + nodeid + "/metrics"
+    .success (data) =>
+      deferred.resolve(data)
+
+    deferred.promise
+
+  @getMetrics = (jobid, nodeid, metricIds) ->
+    deferred = $q.defer()
+
+    ids = metricIds.join(",")
+
+    $http.get flinkConfig.jobServer + "jobs/" + jobid + "/vertices/" + nodeid + "/metrics?get=" + ids
+    .success (data) =>
+      result = {}
+      angular.forEach data, (v, k) ->
+        result[v.id] = parseInt(v.value)
+
+      newValue = {
+        timestamp: Date.now()
+        values: result
+      }
+      @saveValue(jobid, nodeid, newValue)
+      deferred.resolve(newValue)
+
+    deferred.promise
+
+  @setupLS()
+
+  @

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/styles/index.styl
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/styles/index.styl b/flink-runtime-web/web-dashboard/app/styles/index.styl
index 7cde98a..29990dc 100644
--- a/flink-runtime-web/web-dashboard/app/styles/index.styl
+++ b/flink-runtime-web/web-dashboard/app/styles/index.styl
@@ -454,6 +454,7 @@ livechart
 @import './job'
 @import './graph'
 @import './timeline'
+@import './metric'
 
 @media (min-width: 1024px) and (max-width: 1279px)
   #sidebar

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/app/styles/metric.styl
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/styles/metric.styl b/flink-runtime-web/web-dashboard/app/styles/metric.styl
new file mode 100644
index 0000000..049c1b7
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/styles/metric.styl
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#content
+  .navbar-secondary-additional
+    &.navbar-secondary-additional-2
+      margin: -10px -10px 10px -10px
+      padding: 0
+      border-bottom: 1px solid #e4e4e4
+
+      .navbar-info
+        padding-top: 12px
+        padding-bottom: 12px
+
+      .add-metrics
+        margin-right: 15px
+        float: right
+        .btn
+          margin-top: 5px
+          margin-bottom: 5px
+        a
+          cursor: pointer
+
+      .metric-menu
+        max-height: 300px
+        overflow-y: scroll
+
+$metric-row-height = 180px + 85px
+
+.metric-row
+  margin: 0px
+  min-height: 1 * ($metric-row-height + 10px)
+  padding: 0
+  list-style-type: none
+
+  .metric-col
+    background-color: transparent
+    width: 33.33%
+    float: left
+
+    &.big
+      width: 100%
+
+    .panel
+      margin-left: 5px
+      margin-right: 5px
+      min-height: $metric-row-height
+      margin-bottom: 10px
+      .panel-body
+        background-color: transparent
+        height: $metric-row-height
+        position: relative
+
+      .panel-heading
+        padding: 0px 10px
+        background-color: transparent
+        height: 41px
+        line-height: 41px
+        position: relative
+        overflow: hidden
+        cursor: pointer
+
+        .metric-title
+          padding: 10px 0
+
+        .buttons
+          position: absolute
+          top: 0
+          right: 0
+          padding: 0 10px
+          background-color: #ffffff
+
+    &.dndDraggingSource
+      display: none
+
+  .dndPlaceholder
+    position: relative
+    background-color: #f0f0f0
+    min-height: $metric-row-height + 40px
+    display: block
+    width: 33.33%
+    float: left
+    margin-bottom: 10px
+    border-radius: 5px
+
+.p-info
+  padding-left: 5px
+  padding-right: 5px

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/bower.json
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/bower.json b/flink-runtime-web/web-dashboard/bower.json
index 506c4ac..445d32a 100644
--- a/flink-runtime-web/web-dashboard/bower.json
+++ b/flink-runtime-web/web-dashboard/bower.json
@@ -1,7 +1,6 @@
 {
   "name": "flink-dashboard",
   "version": "1.0.0",
- 
   "license": "The Apache Software License 2.0",
   "private": true,
   "ignore": [
@@ -21,7 +20,9 @@
     "dagre-d3": "0.4.17",
     "font-awesome": "4.5.0",
     "moment-duration-format": "1.3.0",
-    "qtip2": "2.2.1"
+    "qtip2": "2.2.1",
+    "angular-drag-and-drop-lists": "^1.4.0",
+    "nvd3": "^1.8.4"
   },
   "overrides": {
     "dagre-d3": {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a4fc537/flink-runtime-web/web-dashboard/web/css/index.css
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/css/index.css b/flink-runtime-web/web-dashboard/web/css/index.css
index 49449f2..46ad806 100644
--- a/flink-runtime-web/web-dashboard/web/css/index.css
+++ b/flink-runtime-web/web-dashboard/web/css/index.css
@@ -228,6 +228,7 @@
   overflow: hidden;
   padding: 2px 4px;
   font-size: 14px;
+  -webkit-border-radius: 2px;
   border-radius: 2px;
   margin-top: -3px;
 }
@@ -495,6 +496,7 @@ pre {
 }
 .nav-tabs.tabs-vertical li > a {
   margin-right: 0;
+  -webkit-border-radius: 0;
   border-radius: 0;
   border-bottom: none;
   border-left: 2px solid transparent;
@@ -539,6 +541,7 @@ livechart {
   padding-right: 0.4em;
   margin: 0;
   border-right: 1px solid #fff;
+  -webkit-border-radius: 0;
   border-radius: 0;
 }
 .label-group .label.label-black {
@@ -636,6 +639,92 @@ svg.graph .node-label {
   font-size: 14px;
   line-height: 1.4;
 }
+#content .navbar-secondary-additional.navbar-secondary-additional-2 {
+  margin: -10px -10px 10px -10px;
+  padding: 0;
+  border-bottom: 1px solid #e4e4e4;
+}
+#content .navbar-secondary-additional.navbar-secondary-additional-2 .navbar-info {
+  padding-top: 12px;
+  padding-bottom: 12px;
+}
+#content .navbar-secondary-additional.navbar-secondary-additional-2 .add-metrics {
+  margin-right: 15px;
+  float: right;
+}
+#content .navbar-secondary-additional.navbar-secondary-additional-2 .add-metrics .btn {
+  margin-top: 5px;
+  margin-bottom: 5px;
+}
+#content .navbar-secondary-additional.navbar-secondary-additional-2 .add-metrics a {
+  cursor: pointer;
+}
+#content .navbar-secondary-additional.navbar-secondary-additional-2 .metric-menu {
+  max-height: 300px;
+  overflow-y: scroll;
+}
+.metric-row {
+  margin: 0px;
+  min-height: 275px;
+  padding: 0;
+  list-style-type: none;
+}
+.metric-row .metric-col {
+  background-color: transparent;
+  width: 33.33%;
+  float: left;
+}
+.metric-row .metric-col.big {
+  width: 100%;
+}
+.metric-row .metric-col .panel {
+  margin-left: 5px;
+  margin-right: 5px;
+  min-height: 265px;
+  margin-bottom: 10px;
+}
+.metric-row .metric-col .panel .panel-body {
+  background-color: transparent;
+  height: 265px;
+  position: relative;
+}
+.metric-row .metric-col .panel .panel-heading {
+  padding: 0px 10px;
+  background-color: transparent;
+  height: 41px;
+  line-height: 41px;
+  position: relative;
+  overflow: hidden;
+  cursor: pointer;
+}
+.metric-row .metric-col .panel .panel-heading .metric-title {
+  padding: 10px 0;
+}
+.metric-row .metric-col .panel .panel-heading .buttons {
+  position: absolute;
+  top: 0;
+  right: 0;
+  padding: 0 10px;
+  background-color: #fff;
+}
+.metric-row .metric-col.dndDraggingSource {
+  display: none;
+}
+.metric-row .dndPlaceholder {
+  position: relative;
+  background-color: #f0f0f0;
+  min-height: 305px;
+  display: block;
+  width: 33.33%;
+  float: left;
+  margin-bottom: 10px;
+  -webkit-border-radius: 5px;
+  border-radius: 5px;
+}
+.p-info {
+  padding-left: 5px;
+  padding-right: 5px;
+}
 @media (min-width: 1024px) and (max-width: 1279px) {
   #sidebar {
     left: 0;