You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/03/08 14:28:51 UTC

[4/5] flink git commit: [FLINK-3427] [webui] Refactorings to watermark tracking

[FLINK-3427] [webui] Refactorings to watermark tracking


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ef18f65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ef18f65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ef18f65

Branch: refs/heads/master
Commit: 4ef18f6597bfde99733f8d4f4a54b90fc943c663
Parents: d84b65f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Mar 7 11:36:50 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Mar 8 15:28:41 2017 +0100

----------------------------------------------------------------------
 .../app/partials/jobs/job.plan.jade             |  2 +-
 .../jobs/job.plan.node-list.watermarks.jade     | 14 +--
 .../partials/jobs/job.plan.node.watermarks.jade | 10 +-
 .../app/scripts/common/filters.coffee           | 16 +---
 .../web-dashboard/app/scripts/index.coffee      |  9 +-
 .../app/scripts/modules/jobs/jobs.ctrl.coffee   | 98 ++++++++++++--------
 .../app/scripts/modules/jobs/jobs.dir.coffee    | 17 ++--
 7 files changed, 93 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
index c33b9a3..6c4cf0b 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
@@ -18,7 +18,7 @@
 split
   .split#canvas
     .canvas-wrapper
-      div.main-canvas(job-plan, plan="plan", low-watermarks="lowWatermarks" jobid="{{jobid}}", set-node="changeNode(nodeid)")
+      div.main-canvas(job-plan, plan="plan", watermarks="watermarks" jobid="{{jobid}}", set-node="changeNode(nodeid)")
 
   .split#job-panel
     .panel.panel-default.panel-multi(ng-if="plan")

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
index 6b4c6a2..4605b61 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
@@ -23,14 +23,14 @@ table.table.table-body-hover.table-clickable.table-activable
       th Parallelism
       th Status
 
-  tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid && hasWatermarks(nodeid) }" ng-click="changeNode(v.id)")
+  tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="changeNode(v.id)")
     tr(ng-if="v.type == 'regular'")
-
       td.td-long {{ v.name | humanizeText }}
-      td {{ watermarks | lowWatermark:v.id }}
+      td {{ watermarks[v.id]["lowWatermark"] | humanizeWatermark }}
       td {{ v.parallelism }}
-      td 
+      td
         bs-label(status="{{v.status}}") {{v.status}}
-    tr(ng-if="nodeid && v.id == nodeid && hasWatermarks(nodeid)")
-      td(colspan="11")
-        div(ng-include=" 'partials/jobs/job.plan.node.watermarks.html' ")
+    tr(ng-if="nodeid && v.id == nodeid")
+      td(colspan="4")
+        div(ng-show="hasWatermark(v.id)" ng-include=" 'partials/jobs/job.plan.node.watermarks.html' ")
+        div(ng-show="!hasWatermark(v.id)") No Watermarks

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
index b406a1c..451ccaa 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
@@ -15,13 +15,13 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 
-table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="hasWatermarks(nodeid)")
+table.table.table-hover.table-clickable.table-activable.table-inner
   thead
     tr
-      th id
+      th Subtask
       th Watermark
 
   tbody
-    tr(ng-repeat="watermark in watermarksByNode(nodeid)")
-      td {{ watermark.id }}
-      td {{ watermark.value | parseWatermark }}
+    tr(ng-repeat="(subtaskIndex, watermark) in watermarks[nodeid]['watermarks']")
+      td {{ subtaskIndex | increment }}
+      td {{ watermark | humanizeWatermark }}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee b/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee
index 99e12a8..a3ce508 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee
@@ -88,19 +88,13 @@ angular.module('flinkApp')
 .filter "percentage", ->
   (number) -> (number * 100).toFixed(0) + '%'
 
-.filter "parseWatermark", (watermarksConfig)->
+.filter "humanizeWatermark", (watermarksConfig) ->
   (value) ->
-    if value <= watermarksConfig.minValue
+    if isNaN(value) || value <= watermarksConfig.noWatermark
       return 'No Watermark'
     else
       return value
 
-.filter "lowWatermark", (watermarksConfig)->
-  (watermarks, nodeid) ->
-    lowWatermark = "No Watermark"
-    if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length
-      values = (watermark.value for watermark in watermarks[nodeid])
-      lowWatermark = Math.min.apply(null, values)
-      if lowWatermark <= watermarksConfig.minValue
-        lowWatermark = "No Watermark"
-    return lowWatermark
+.filter "increment", ->
+  (number) ->
+    parseInt(number) + 1

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index cbbefab..52bb075 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -30,14 +30,17 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])
 
 .value 'flinkConfig', {
   jobServer: ''
-  # jobServer: 'http://localhost:8081/'
+#  jobServer: 'http://localhost:8081/'
   "refresh-interval": 10000
 }
 
 # --------------------------------------
 
 .value 'watermarksConfig', {
-  minValue: -9223372036854776000
+  # A value of (Java) Long.MIN_VALUE indicates that there is no watermark
+  # available. This is parsed by Javascript as this number. We have it as
+  # a constant here to compare available watermarks against.
+  noWatermark: -9223372036854776000
 }
 
 # --------------------------------------
@@ -52,7 +55,6 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])
       JobsService.listJobs()
     , flinkConfig["refresh-interval"]
 
-
 # --------------------------------------
 
 .config ($uiViewScrollProvider) ->
@@ -125,7 +127,6 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])
     views:
       'node-details':
         templateUrl: "partials/jobs/job.plan.node-list.watermarks.html"
-        controller: 'JobPlanWatermarksController'
 
   .state "single-job.plan.taskmanagers",
     url: "/taskmanagers"

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
index d18d7e3..f25c94d 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
@@ -46,8 +46,7 @@ angular.module('flinkApp')
   $scope.jobid = $stateParams.jobid
   $scope.job = null
   $scope.plan = null
-  $scope.watermarks = null
-  $scope.lowWatermarks = null
+  $scope.watermarks = {}
   $scope.vertices = null
   $scope.backPressureOperatorStats = {}
 
@@ -61,8 +60,7 @@ angular.module('flinkApp')
   $scope.$on '$destroy', ->
     $scope.job = null
     $scope.plan = null
-    $scope.watermarks = null
-    $scope.lowWatermarks = null
+    $scope.watermarks = {}
     $scope.vertices = null
     $scope.backPressureOperatorStats = null
 
@@ -84,43 +82,80 @@ angular.module('flinkApp')
     $scope.plan = data.plan
     MetricsService.setupMetrics($stateParams.jobid, data.vertices)
 
-  getWatermarks = (nodes)->
-    # This function uses a promise to resolve watermarks once fetched via the metrics service, since watermarks have to be fetched individually for each node, we have to wait until all API calls have been made before we can resolve the promise. In the end we will have an array of low watermarks for each node: e.g. {somenodeid: [{id: 0, value: -9223372036854776000}], anothernodeid: [{id: 0, value: -9223372036854776000}, {id: 1, value: -9223372036854776000}]}.
+  # Asynchronously requests the watermark metrics for the given nodes. The
+  # returned object has the following structure:
+  #
+  # {
+  #    "<nodeId>": {
+  #          "lowWatermark": <lowWatermark>
+  #          "watermarks": {
+  #               0: <watermark for subtask 0>
+  #               ...
+  #               n: <watermark for subtask n>
+  #            }
+  #       }
+  # }
+  #
+  # If no watermark is available, lowWatermark will be NaN and
+  # the watermarks will be empty.
+  getWatermarks = (nodes) ->
+    # Requests the watermarks for a single vertex. Triggers a request
+    # to the Metrics service.
+    requestWatermarkForNode = (node) =>
+      deferred = $q.defer()
+
+      jid = $scope.job.jid
+
+      # Request metrics for each subtask
+      metricIds = (i + ".currentLowWatermark" for i in [0..node.parallelism - 1])
+      MetricsService.getMetrics(jid, node.id, metricIds).then (metrics) ->
+        minValue = NaN
+        watermarks = {}
+
+        for key, value of metrics.values
+          subtaskIndex = key.replace('.currentLowWatermark', '')
+          watermarks[subtaskIndex] = value
+
+          if (isNaN(minValue) || value < minValue)
+            minValue = value
+
+        if (!isNaN(minValue) && minValue > watermarksConfig.noWatermark)
+          lowWatermark = minValue
+        else
+          # NaN indicates no watermark available
+          lowWatermark = NaN
+
+        deferred.resolve({"lowWatermark": lowWatermark, "watermarks": watermarks})
+
+      deferred.promise
+
     deferred = $q.defer()
     watermarks = {}
-    jid = $scope.job.jid
+
+    # Request watermarks for each node and update watermarks
+    len = nodes.length
     angular.forEach nodes, (node, index) =>
-      metricIds = []
-      # for each node, we need to specify which metrics we want to collect, for each subtask, we need to fetch the currentLowWatermark, and each param is formed by concatenating subtask index to '.currentLowWatermark'.
-      for num in [0..node.parallelism - 1]
-        metricIds.push(num + ".currentLowWatermark")
-      MetricsService.getMetrics(jid, node.id, metricIds).then (data) ->
-        values = []
-        for key, value of data.values
-          values.push(id: key.replace('.currentLowWatermark', ''), value: value)
-        watermarks[node.id] = values
-        if index >= $scope.plan.nodes.length - 1
+      nodeId = node.id
+      requestWatermarkForNode(node).then (data) ->
+        watermarks[nodeId] = data
+        if (index >= len - 1)
           deferred.resolve(watermarks)
+
     deferred.promise
 
-  getLowWatermarks = (watermarks)->
-    lowWatermarks = []
-    for k,v of watermarks
-      minValue = Math.min.apply(null,(watermark.value for watermark in v))
-      lowWatermarks[k] = if minValue <= watermarksConfig.minValue || v.length == 0 then 'No Watermark' else minValue
-    return lowWatermarks
+  # Returns true if the lowWatermark is != NaN
+  $scope.hasWatermark = (nodeid) ->
+    $scope.watermarks[nodeid] && !isNaN($scope.watermarks[nodeid]["lowWatermark"])
 
   $scope.$watch 'plan', (newPlan) ->
     if newPlan
       getWatermarks(newPlan.nodes).then (data) ->
         $scope.watermarks = data
-        $scope.lowWatermarks = getLowWatermarks(data)
 
-  $scope.$on 'reload', (event) ->
+  $scope.$on 'reload', () ->
     if $scope.plan
       getWatermarks($scope.plan.nodes).then (data) ->
         $scope.watermarks = data
-        $scope.lowWatermarks = getLowWatermarks(data)
 
 # --------------------------------------
 
@@ -359,14 +394,3 @@ angular.module('flinkApp')
   loadMetrics() if $scope.nodeid
 
 # --------------------------------------
-
-.controller 'JobPlanWatermarksController', ($scope, $filter) ->
-  $scope.hasWatermarks = (nodeid) ->
-    return true if $scope.watermarksByNode(nodeid).length
-
-  $scope.watermarksByNode = (nodeid) ->
-    if $scope.watermarks != null && $scope.watermarks[nodeid] && $scope.watermarks[nodeid].length
-      return $scope.watermarks[nodeid]
-    return []
-
-# --------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
index 950cf06..36b0c43 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
@@ -169,7 +169,7 @@ angular.module('flinkApp')
 
 # ----------------------------------------------
 
-.directive 'split', () -> 
+.directive 'split', () ->
   return compile: (tElem, tAttrs) ->
       Split(tElem.children(), (
         sizes: [50, 50]
@@ -189,7 +189,7 @@ angular.module('flinkApp')
 
   scope:
     plan: '='
-    lowWatermarks: '='
+    watermarks: '='
     setNode: '&'
 
   link: (scope, elem, attrs) ->
@@ -436,10 +436,11 @@ angular.module('flinkApp')
             return el.step_function[j]  if el.step_function[j].id is nodeID
 
     mergeWatermarks = (data, watermarks) ->
-      for k,v of watermarks
+      if (!_.isEmpty(watermarks))
         for node in data.nodes
-          if node.id == k
-            node.lowWatermark = v
+          if (watermarks[node.id] && !isNaN(watermarks[node.id]["lowWatermark"]))
+            node.lowWatermark = watermarks[node.id]["lowWatermark"]
+
       return data
 
     lastPosition = 0
@@ -456,7 +457,7 @@ angular.module('flinkApp')
           marginy: 40
           })
 
-        loadJsonToDagre(g, mergeWatermarks(scope.plan, scope.lowWatermarks))
+        loadJsonToDagre(g, mergeWatermarks(scope.plan, scope.watermarks))
 
         d3mainSvgG.selectAll("*").remove()
 
@@ -494,7 +495,7 @@ angular.module('flinkApp')
     scope.$watch attrs.plan, (newPlan) ->
       drawGraph() if newPlan
 
-    scope.$watch attrs.lowWatermarks, (newLowWatermarks) ->
-      drawGraph() if newLowWatermarks && scope.plan
+    scope.$watch attrs.watermarks, (newWatermarks) ->
+      drawGraph() if newWatermarks && scope.plan
 
     return