You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/03/08 14:28:52 UTC

[5/5] flink git commit: [FLINK-3427] [webui] Add watermark tracking

[FLINK-3427] [webui] Add watermark tracking


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d84b65ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d84b65ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d84b65ff

Branch: refs/heads/master
Commit: d84b65ff15876feb3e26dd20beb2e743968502bc
Parents: 7a629fc
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Mar 8 11:34:29 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Mar 8 15:28:41 2017 +0100

----------------------------------------------------------------------
 .../app/partials/jobs/job.plan.jade             |  5 +-
 .../jobs/job.plan.node-list.watermarks.jade     | 36 ++++++++
 .../partials/jobs/job.plan.node.watermarks.jade | 27 ++++++
 .../app/scripts/common/filters.coffee           | 17 ++++
 .../web-dashboard/app/scripts/index.coffee      | 15 +++-
 .../app/scripts/modules/jobs/jobs.ctrl.coffee   | 68 +++++++++++++--
 .../app/scripts/modules/jobs/jobs.dir.coffee    | 90 ++++++++++++++------
 7 files changed, 221 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
index e84dd04..c33b9a3 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
@@ -18,7 +18,7 @@
 split
   .split#canvas
     .canvas-wrapper
-      div.main-canvas(job-plan, plan="plan", jobid="{{jobid}}", set-node="changeNode(nodeid)")
+      div.main-canvas(job-plan, plan="plan", low-watermarks="lowWatermarks" jobid="{{jobid}}", set-node="changeNode(nodeid)")
 
   .split#job-panel
     .panel.panel-default.panel-multi(ng-if="plan")
@@ -34,6 +34,9 @@ split
             a(ui-sref=".metrics({nodeid: nodeid})") Metrics
 
           li(ui-sref-active='active')
+            a(ui-sref=".watermarks({nodeid: nodeid})") Watermarks
+
+          li(ui-sref-active='active')
             a(ui-sref=".accumulators({nodeid: nodeid})") Accumulators
 
           li(ui-sref-active='active')

http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
new file mode 100644
index 0000000..6b4c6a2
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
@@ -0,0 +1,36 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+table.table.table-body-hover.table-clickable.table-activable
+  thead
+    tr
+      th Name
+      th Low Watermark
+      th Parallelism
+      th Status
+
+  tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid && hasWatermarks(nodeid) }" ng-click="changeNode(v.id)")
+    tr(ng-if="v.type == 'regular'")
+
+      td.td-long {{ v.name | humanizeText }}
+      td {{ watermarks | lowWatermark:v.id }}
+      td {{ v.parallelism }}
+      td 
+        bs-label(status="{{v.status}}") {{v.status}}
+    tr(ng-if="nodeid && v.id == nodeid && hasWatermarks(nodeid)")
+      td(colspan="11")
+        div(ng-include=" 'partials/jobs/job.plan.node.watermarks.html' ")

http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
new file mode 100644
index 0000000..b406a1c
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
@@ -0,0 +1,27 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="hasWatermarks(nodeid)")
+  thead
+    tr
+      th id
+      th Watermark
+
+  tbody
+    tr(ng-repeat="watermark in watermarksByNode(nodeid)")
+      td {{ watermark.id }}
+      td {{ watermark.value | parseWatermark }}

http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee b/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee
index 67b02e3..99e12a8 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee
@@ -87,3 +87,20 @@ angular.module('flinkApp')
 
 .filter "percentage", ->
   (number) -> (number * 100).toFixed(0) + '%'
+
+.filter "parseWatermark", (watermarksConfig)->
+  (value) ->
+    if value <= watermarksConfig.minValue
+      return 'No Watermark'
+    else
+      return value
+
+.filter "lowWatermark", (watermarksConfig)->
+  (watermarks, nodeid) ->
+    lowWatermark = "No Watermark"
+    if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length
+      values = (watermark.value for watermark in watermarks[nodeid])
+      lowWatermark = Math.min.apply(null, values)
+      if lowWatermark <= watermarksConfig.minValue
+        lowWatermark = "No Watermark"
+    return lowWatermark

http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index 95bb356..cbbefab 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -30,12 +30,18 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])
 
 .value 'flinkConfig', {
   jobServer: ''
-#  jobServer: 'http://localhost:8081/'
+  # jobServer: 'http://localhost:8081/'
   "refresh-interval": 10000
 }
 
 # --------------------------------------
 
+.value 'watermarksConfig', {
+  minValue: -9223372036854776000
+}
+
+# --------------------------------------
+
 .run (JobsService, MainService, flinkConfig, $interval) ->
   MainService.loadConfig().then (config) ->
     angular.extend flinkConfig, config
@@ -114,6 +120,13 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])
         templateUrl: "partials/jobs/job.plan.node-list.metrics.html"
         controller: 'JobPlanMetricsController'
 
+  .state "single-job.plan.watermarks",
+    url: "/watermarks"
+    views:
+      'node-details':
+        templateUrl: "partials/jobs/job.plan.node-list.watermarks.html"
+        controller: 'JobPlanWatermarksController'
+
   .state "single-job.plan.taskmanagers",
     url: "/taskmanagers"
     views:

http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
index bbb57c5..d18d7e3 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
@@ -42,23 +42,18 @@ angular.module('flinkApp')
 
 # --------------------------------------
 
-.controller 'SingleJobController', ($scope, $state, $stateParams, JobsService, MetricsService, $rootScope, flinkConfig, $interval) ->
+.controller 'SingleJobController', ($scope, $state, $stateParams, JobsService, MetricsService, $rootScope, flinkConfig, $interval, $q, watermarksConfig) ->
   $scope.jobid = $stateParams.jobid
   $scope.job = null
   $scope.plan = null
+  $scope.watermarks = null
+  $scope.lowWatermarks = null
   $scope.vertices = null
   $scope.backPressureOperatorStats = {}
 
-  JobsService.loadJob($stateParams.jobid).then (data) ->
-    $scope.job = data
-    $scope.plan = data.plan
-    $scope.vertices = data.vertices
-    MetricsService.setupMetrics($stateParams.jobid, data.vertices)
-
   refresher = $interval ->
     JobsService.loadJob($stateParams.jobid).then (data) ->
       $scope.job = data
-
       $scope.$broadcast 'reload'
 
   , flinkConfig["refresh-interval"]
@@ -66,6 +61,8 @@ angular.module('flinkApp')
   $scope.$on '$destroy', ->
     $scope.job = null
     $scope.plan = null
+    $scope.watermarks = null
+    $scope.lowWatermarks = null
     $scope.vertices = null
     $scope.backPressureOperatorStats = null
 
@@ -81,6 +78,50 @@ angular.module('flinkApp')
     JobsService.stopJob($stateParams.jobid).then (data) ->
       {}
 
+  JobsService.loadJob($stateParams.jobid).then (data) ->
+    $scope.job = data
+    $scope.vertices = data.vertices
+    $scope.plan = data.plan
+    MetricsService.setupMetrics($stateParams.jobid, data.vertices)
+
+  getWatermarks = (nodes)->
+    # This function uses a promise to resolve watermarks once fetched via the metrics service, since watermarks have to be fetched individually for each node, we have to wait until all API calls have been made before we can resolve the promise. In the end we will have an array of low watermarks for each node: e.g. {somenodeid: [{id: 0, value: -9223372036854776000}], anothernodeid: [{id: 0, value: -9223372036854776000}, {id: 1, value: -9223372036854776000}]}.
+    deferred = $q.defer()
+    watermarks = {}
+    jid = $scope.job.jid
+    angular.forEach nodes, (node, index) =>
+      metricIds = []
+      # for each node, we need to specify which metrics we want to collect, for each subtask, we need to fetch the currentLowWatermark, and each param is formed by concatenating subtask index to '.currentLowWatermark'.
+      for num in [0..node.parallelism - 1]
+        metricIds.push(num + ".currentLowWatermark")
+      MetricsService.getMetrics(jid, node.id, metricIds).then (data) ->
+        values = []
+        for key, value of data.values
+          values.push(id: key.replace('.currentLowWatermark', ''), value: value)
+        watermarks[node.id] = values
+        if index >= $scope.plan.nodes.length - 1
+          deferred.resolve(watermarks)
+    deferred.promise
+
+  getLowWatermarks = (watermarks)->
+    lowWatermarks = []
+    for k,v of watermarks
+      minValue = Math.min.apply(null,(watermark.value for watermark in v))
+      lowWatermarks[k] = if minValue <= watermarksConfig.minValue || v.length == 0 then 'No Watermark' else minValue
+    return lowWatermarks
+
+  $scope.$watch 'plan', (newPlan) ->
+    if newPlan
+      getWatermarks(newPlan.nodes).then (data) ->
+        $scope.watermarks = data
+        $scope.lowWatermarks = getLowWatermarks(data)
+
+  $scope.$on 'reload', (event) ->
+    if $scope.plan
+      getWatermarks($scope.plan.nodes).then (data) ->
+        $scope.watermarks = data
+        $scope.lowWatermarks = getLowWatermarks(data)
+
 # --------------------------------------
 
 .controller 'JobPlanController', ($scope, $state, $stateParams, $window, JobsService) ->
@@ -318,3 +359,14 @@ angular.module('flinkApp')
   loadMetrics() if $scope.nodeid
 
 # --------------------------------------
+
+.controller 'JobPlanWatermarksController', ($scope, $filter) ->
+  $scope.hasWatermarks = (nodeid) ->
+    return true if $scope.watermarksByNode(nodeid).length
+
+  $scope.watermarksByNode = (nodeid) ->
+    if $scope.watermarks != null && $scope.watermarks[nodeid] && $scope.watermarks[nodeid].length
+      return $scope.watermarks[nodeid]
+    return []
+
+# --------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
index b67d0bf..950cf06 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
@@ -168,12 +168,14 @@ angular.module('flinkApp')
     return
 
 # ----------------------------------------------
+
 .directive 'split', () -> 
   return compile: (tElem, tAttrs) ->
       Split(tElem.children(), (
         sizes: [50, 50]
         direction: 'vertical'
       ))
+
 # ----------------------------------------------
 
 .directive 'jobPlan', ($timeout) ->
@@ -187,6 +189,7 @@ angular.module('flinkApp')
 
   scope:
     plan: '='
+    lowWatermarks: '='
     setNode: '&'
 
   link: (scope, elem, attrs) ->
@@ -209,6 +212,9 @@ angular.module('flinkApp')
     containerW = elem.width()
     angular.element(elem.children()[0]).width(containerW)
 
+    lastZoomScale = 0
+    lastPosition = 0
+
     scope.zoomIn = ->
       if mainZoom.scale() < 2.99
 
@@ -222,6 +228,9 @@ angular.module('flinkApp')
         # Transform svg
         d3mainSvgG.attr "transform", "translate(" + v1 + "," + v2 + ") scale(" + mainZoom.scale() + ")"
 
+        lastZoomScale = mainZoom.scale()
+        lastPosition = mainZoom.translate()
+
     scope.zoomOut = ->
       if mainZoom.scale() > 0.31
 
@@ -235,6 +244,9 @@ angular.module('flinkApp')
         # Transform svg
         d3mainSvgG.attr "transform", "translate(" + v1 + "," + v2 + ") scale(" + mainZoom.scale() + ")"
 
+        lastZoomScale = mainZoom.scale()
+        lastPosition = mainZoom.translate()
+
     #create a label of an edge
     createLabelEdge = (el) ->
       labelValue = ""
@@ -288,6 +300,7 @@ angular.module('flinkApp')
         # Otherwise add infos
         labelValue += "<h5>" + info + " Node</h5>"  if isSpecialIterationNode(info)
         labelValue += "<h5>Parallelism: " + el.parallelism + "</h5>"  unless el.parallelism is ""
+        labelValue += "<h5>Low Watermark: " + el.lowWatermark + "</h5>"  unless el.lowWatermark is `undefined`
         labelValue += "<h5>Operation: " + shortenString(el.operator_strategy) + "</h5>" unless el.operator is `undefined` or not el.operator_strategy
       # labelValue += "</a>"
       labelValue += "</div>"
@@ -422,43 +435,66 @@ angular.module('flinkApp')
           for j of el.step_function
             return el.step_function[j]  if el.step_function[j].id is nodeID
 
-    drawGraph = (data) ->
-      g = new dagreD3.graphlib.Graph({ multigraph: true, compound: true }).setGraph({
-        nodesep: 70
-        edgesep: 0
-        ranksep: 50
-        rankdir: "LR"
-        marginx: 40
-        marginy: 40
-        })
+    mergeWatermarks = (data, watermarks) ->
+      for k,v of watermarks
+        for node in data.nodes
+          if node.id == k
+            node.lowWatermark = v
+      return data
 
-      loadJsonToDagre(g, data)
+    lastPosition = 0
+    lastZoomScale = 0
 
-      renderer = new dagreD3.render()
-      d3mainSvgG.call(renderer, g)
+    drawGraph = () ->
+      if scope.plan
+        g = new dagreD3.graphlib.Graph({ multigraph: true, compound: true }).setGraph({
+          nodesep: 70
+          edgesep: 0
+          ranksep: 50
+          rankdir: "LR"
+          marginx: 40
+          marginy: 40
+          })
 
-      for i, sg of subgraphs
-        d3mainSvg.select('svg.svg-' + i + ' g').call(renderer, sg)
+        loadJsonToDagre(g, mergeWatermarks(scope.plan, scope.lowWatermarks))
 
-      newScale = 0.5
+        d3mainSvgG.selectAll("*").remove()
 
-      xCenterOffset = Math.floor((angular.element(mainSvgElement).width() - g.graph().width * newScale) / 2)
-      yCenterOffset = Math.floor((angular.element(mainSvgElement).height() - g.graph().height * newScale) / 2)
+        d3mainSvgG.attr("transform", "scale(" + 1 + ")")
 
-      mainZoom.scale(newScale).translate([xCenterOffset, yCenterOffset])
+        renderer = new dagreD3.render()
+        d3mainSvgG.call(renderer, g)
 
-      d3mainSvgG.attr("transform", "translate(" + xCenterOffset + ", " + yCenterOffset + ") scale(" + mainZoom.scale() + ")")
+        for i, sg of subgraphs
+          d3mainSvg.select('svg.svg-' + i + ' g').call(renderer, sg)
 
-      mainZoom.on("zoom", ->
-        ev = d3.event
-        d3mainSvgG.attr "transform", "translate(" + ev.translate + ") scale(" + ev.scale + ")"
-      )
-      mainZoom(d3mainSvg)
+        newScale = 0.5
+
+        xCenterOffset = Math.floor((angular.element(mainSvgElement).width() - g.graph().width * newScale) / 2)
+        yCenterOffset = Math.floor((angular.element(mainSvgElement).height() - g.graph().height * newScale) / 2)
+
+        if lastZoomScale != 0 && lastPosition != 0
+          mainZoom.scale(lastZoomScale).translate(lastPosition)
+          d3mainSvgG.attr("transform", "translate(" + lastPosition + ") scale(" + lastZoomScale + ")")
+        else
+          mainZoom.scale(newScale).translate([xCenterOffset, yCenterOffset])
+          d3mainSvgG.attr("transform", "translate(" + xCenterOffset + ", " + yCenterOffset + ") scale(" + mainZoom.scale() + ")")
 
-      d3mainSvgG.selectAll('.node').on 'click', (d) ->
-        scope.setNode({ nodeid: d })
+        mainZoom.on("zoom", ->
+          ev = d3.event
+          lastZoomScale = ev.scale
+          lastPosition = ev.translate
+          d3mainSvgG.attr "transform", "translate(" + lastPosition + ") scale(" + lastZoomScale + ")"
+        )
+        mainZoom(d3mainSvg)
+
+        d3mainSvgG.selectAll('.node').on 'click', (d) ->
+          scope.setNode({ nodeid: d })
 
     scope.$watch attrs.plan, (newPlan) ->
-      drawGraph(newPlan) if newPlan
+      drawGraph() if newPlan
+
+    scope.$watch attrs.lowWatermarks, (newLowWatermarks) ->
+      drawGraph() if newLowWatermarks && scope.plan
 
     return