You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/05 21:52:21 UTC

spark git commit: [SPARK-6939] [STREAMING] [WEBUI] Add timeline and histogram graphs for streaming statistics

Repository: spark
Updated Branches:
  refs/heads/master 47728db7c -> 489700c80


[SPARK-6939] [STREAMING] [WEBUI] Add timeline and histogram graphs for streaming statistics

This is the initial work of SPARK-6939. Not yet ready for code review. Here are the screenshots:

![graph1](https://cloud.githubusercontent.com/assets/1000778/7165766/465942e0-e3dc-11e4-9b05-c184b09d75dc.png)

![graph2](https://cloud.githubusercontent.com/assets/1000778/7165779/53f13f34-e3dc-11e4-8714-a4a75b7e09ff.png)

TODOs:
- [x] Display more information on mouse hover
- [x] Align the timeline and distribution graphs
- [x] Clean up the codes

Author: zsxwing <zs...@gmail.com>

Closes #5533 from zsxwing/SPARK-6939 and squashes the following commits:

9f7cd19 [zsxwing] Merge branch 'master' into SPARK-6939
deacc3f [zsxwing] Remove unused import
cd03424 [zsxwing] Fix .rat-excludes
70cc87d [zsxwing] Streaming Scheduling Delay => Scheduling Delay
d457277 [zsxwing] Fix UIUtils in BatchPage
b3f303e [zsxwing] Add comments for unclear classes and methods
ff0bff8 [zsxwing] Make InputDStream.name private[streaming]
cc392c5 [zsxwing] Merge branch 'master' into SPARK-6939
e275e23 [zsxwing] Move time related methods to Streaming's UIUtils
d5d86f6 [zsxwing] Fix incorrect lastErrorTime
3be4b7a [zsxwing] Use InputInfo
b50fa32 [zsxwing] Jump to the batch page when clicking a point in the timeline graphs
203605d [zsxwing] Merge branch 'master' into SPARK-6939
74307cf [zsxwing] Reuse the data for histogram graphs to reduce the page size
2586916 [zsxwing] Merge branch 'master' into SPARK-6939
70d8533 [zsxwing] Remove BatchInfo.numRecords and a few renames
7bbdc0a [zsxwing] Hide the receiver sub table if no receiver
a2972e9 [zsxwing] Add some ui tests for StreamingPage
fd03ad0 [zsxwing] Add a test to verify no memory leak
4a8f886 [zsxwing] Merge branch 'master' into SPARK-6939
18607a1 [zsxwing] Merge branch 'master' into SPARK-6939
d0b0aec [zsxwing] Clean up the codes
a459f49 [zsxwing] Add a dash line to processing time graphs
8e4363c [zsxwing] Prepare for the demo
c81a1ee [zsxwing] Change time unit in the graphs automatically
4c0b43f [zsxwing] Update Streaming UI
04c7500 [zsxwing] Make the server and client use the same timezone
fed8219 [zsxwing] Move the x axis at the top and show a better tooltip
c23ce10 [zsxwing] Make two graphs close
d78672a [zsxwing] Make the X axis use the same range
881c907 [zsxwing] Use histogram for distribution
5688702 [zsxwing] Fix the unit test
ddf741a [zsxwing] Fix the unit test
ad93295 [zsxwing] Remove unnecessary codes
a0458f9 [zsxwing] Clean the codes
b82ed1e [zsxwing] Update the graphs as per comments
dd653a1 [zsxwing] Add timeline and histogram graphs for streaming statistics


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/489700c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/489700c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/489700c8

Branch: refs/heads/master
Commit: 489700c809a7c0a836538f3d0bd58bed609e8768
Parents: 47728db
Author: zsxwing <zs...@gmail.com>
Authored: Tue May 5 12:52:16 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue May 5 12:52:16 2015 -0700

----------------------------------------------------------------------
 LICENSE                                         |  30 +
 .../apache/spark/ui/static/bootstrap-tooltip.js | 135 +++-
 .../apache/spark/ui/static/streaming-page.css   |  58 ++
 .../apache/spark/ui/static/streaming-page.js    | 274 ++++++++
 .../apache/spark/streaming/DStreamGraph.scala   |   4 +
 .../spark/streaming/dstream/InputDStream.scala  |   5 +
 .../streaming/scheduler/ReceiverInfo.scala      |   3 +-
 .../streaming/scheduler/ReceiverTracker.scala   |  13 +-
 .../spark/streaming/ui/AllBatchesTable.scala    |  10 +-
 .../apache/spark/streaming/ui/BatchPage.scala   |  22 +-
 .../apache/spark/streaming/ui/BatchUIData.scala |   4 +-
 .../ui/StreamingJobProgressListener.scala       |  68 +-
 .../spark/streaming/ui/StreamingPage.scala      | 621 ++++++++++++++-----
 .../org/apache/spark/streaming/ui/UIUtils.scala |  74 +++
 .../spark/streaming/UISeleniumSuite.scala       |  32 +-
 .../ui/StreamingJobProgressListenerSuite.scala  |  52 +-
 .../spark/streaming/ui/UIUtilsSuite.scala       |  67 ++
 17 files changed, 1228 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 21c42e9..b2001f0 100644
--- a/LICENSE
+++ b/LICENSE
@@ -643,6 +643,36 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 THE SOFTWARE.
 
+========================================================================
+For d3 (core/src/main/resources/org/apache/spark/ui/static/d3.min.js):
+========================================================================
+
+Copyright (c) 2010-2015, Michael Bostock
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+  list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+  this list of conditions and the following disclaimer in the documentation
+  and/or other materials provided with the distribution.
+
+* The name Michael Bostock may not be used to endorse or promote products
+  derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL MICHAEL BOSTOCK BE LIABLE FOR ANY DIRECT,
+INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 ========================================================================
 For Scala Interpreter classes (all .scala files in repl/src/main/scala

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js b/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js
index 2934181..acd6096 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js
@@ -1,9 +1,9 @@
 /* ===========================================================
- * bootstrap-tooltip.js v2.2.2
- * http://twitter.github.com/bootstrap/javascript.html#tooltips
+ * bootstrap-tooltip.js v2.3.2
+ * http://getbootstrap.com/2.3.2/javascript.html#tooltips
  * Inspired by the original jQuery.tipsy by Jason Frame
  * ===========================================================
- * Copyright 2012 Twitter, Inc.
+ * Copyright 2013 Twitter, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -38,19 +38,27 @@
   , init: function (type, element, options) {
       var eventIn
         , eventOut
+        , triggers
+        , trigger
+        , i
 
       this.type = type
       this.$element = $(element)
       this.options = this.getOptions(options)
       this.enabled = true
 
-      if (this.options.trigger == 'click') {
-        this.$element.on('click.' + this.type, this.options.selector, $.proxy(this.toggle, this))
-      } else if (this.options.trigger != 'manual') {
-        eventIn = this.options.trigger == 'hover' ? 'mouseenter' : 'focus'
-        eventOut = this.options.trigger == 'hover' ? 'mouseleave' : 'blur'
-        this.$element.on(eventIn + '.' + this.type, this.options.selector, $.proxy(this.enter, this))
-        this.$element.on(eventOut + '.' + this.type, this.options.selector, $.proxy(this.leave, this))
+      triggers = this.options.trigger.split(' ')
+
+      for (i = triggers.length; i--;) {
+        trigger = triggers[i]
+        if (trigger == 'click') {
+          this.$element.on('click.' + this.type, this.options.selector, $.proxy(this.toggle, this))
+        } else if (trigger != 'manual') {
+          eventIn = trigger == 'hover' ? 'mouseenter' : 'focus'
+          eventOut = trigger == 'hover' ? 'mouseleave' : 'blur'
+          this.$element.on(eventIn + '.' + this.type, this.options.selector, $.proxy(this.enter, this))
+          this.$element.on(eventOut + '.' + this.type, this.options.selector, $.proxy(this.leave, this))
+        }
       }
 
       this.options.selector ?
@@ -59,7 +67,7 @@
     }
 
   , getOptions: function (options) {
-      options = $.extend({}, $.fn[this.type].defaults, options, this.$element.data())
+      options = $.extend({}, $.fn[this.type].defaults, this.$element.data(), options)
 
       if (options.delay && typeof options.delay == 'number') {
         options.delay = {
@@ -72,7 +80,15 @@
     }
 
   , enter: function (e) {
-      var self = $(e.currentTarget)[this.type](this._options).data(this.type)
+      var defaults = $.fn[this.type].defaults
+        , options = {}
+        , self
+
+      this._options && $.each(this._options, function (key, value) {
+        if (defaults[key] != value) options[key] = value
+      }, this)
+
+      self = $(e.currentTarget)[this.type](options).data(this.type)
 
       if (!self.options.delay || !self.options.delay.show) return self.show()
 
@@ -97,14 +113,16 @@
 
   , show: function () {
       var $tip
-        , inside
         , pos
         , actualWidth
         , actualHeight
         , placement
         , tp
+        , e = $.Event('show')
 
       if (this.hasContent() && this.enabled) {
+        this.$element.trigger(e)
+        if (e.isDefaultPrevented()) return
         $tip = this.tip()
         this.setContent()
 
@@ -116,19 +134,18 @@
           this.options.placement.call(this, $tip[0], this.$element[0]) :
           this.options.placement
 
-        inside = /in/.test(placement)
-
         $tip
           .detach()
           .css({ top: 0, left: 0, display: 'block' })
-          .insertAfter(this.$element)
 
-        pos = this.getPosition(inside)
+        this.options.container ? $tip.appendTo(this.options.container) : $tip.insertAfter(this.$element)
+
+        pos = this.getPosition()
 
         actualWidth = $tip[0].offsetWidth
         actualHeight = $tip[0].offsetHeight
 
-        switch (inside ? placement.split(' ')[1] : placement) {
+        switch (placement) {
           case 'bottom':
             tp = {top: pos.top + pos.height, left: pos.left + pos.width / 2 - actualWidth / 2}
             break
@@ -143,11 +160,56 @@
             break
         }
 
-        $tip
-          .offset(tp)
-          .addClass(placement)
-          .addClass('in')
+        this.applyPlacement(tp, placement)
+        this.$element.trigger('shown')
+      }
+    }
+
+  , applyPlacement: function(offset, placement){
+      var $tip = this.tip()
+        , width = $tip[0].offsetWidth
+        , height = $tip[0].offsetHeight
+        , actualWidth
+        , actualHeight
+        , delta
+        , replace
+
+      $tip
+        .offset(offset)
+        .addClass(placement)
+        .addClass('in')
+
+      actualWidth = $tip[0].offsetWidth
+      actualHeight = $tip[0].offsetHeight
+
+      if (placement == 'top' && actualHeight != height) {
+        offset.top = offset.top + height - actualHeight
+        replace = true
+      }
+
+      if (placement == 'bottom' || placement == 'top') {
+        delta = 0
+
+        if (offset.left < 0){
+          delta = offset.left * -2
+          offset.left = 0
+          $tip.offset(offset)
+          actualWidth = $tip[0].offsetWidth
+          actualHeight = $tip[0].offsetHeight
+        }
+
+        this.replaceArrow(delta - width + actualWidth, actualWidth, 'left')
+      } else {
+        this.replaceArrow(actualHeight - height, actualHeight, 'top')
       }
+
+      if (replace) $tip.offset(offset)
+    }
+
+  , replaceArrow: function(delta, dimension, position){
+      this
+        .arrow()
+        .css(position, delta ? (50 * (1 - delta / dimension) + "%") : '')
     }
 
   , setContent: function () {
@@ -161,6 +223,10 @@
   , hide: function () {
       var that = this
         , $tip = this.tip()
+        , e = $.Event('hide')
+
+      this.$element.trigger(e)
+      if (e.isDefaultPrevented()) return
 
       $tip.removeClass('in')
 
@@ -179,6 +245,8 @@
         removeWithAnimation() :
         $tip.detach()
 
+      this.$element.trigger('hidden')
+
       return this
     }
 
@@ -193,11 +261,12 @@
       return this.getTitle()
     }
 
-  , getPosition: function (inside) {
-      return $.extend({}, (inside ? {top: 0, left: 0} : this.$element.offset()), {
-        width: this.$element[0].offsetWidth
-      , height: this.$element[0].offsetHeight
-      })
+  , getPosition: function () {
+      var el = this.$element[0]
+      return $.extend({}, (typeof el.getBoundingClientRect == 'function') ? el.getBoundingClientRect() : {
+        width: el.offsetWidth
+      , height: el.offsetHeight
+      }, this.$element.offset())
     }
 
   , getTitle: function () {
@@ -215,6 +284,10 @@
       return this.$tip = this.$tip || $(this.options.template)
     }
 
+  , arrow: function(){
+      return this.$arrow = this.$arrow || this.tip().find(".tooltip-arrow")
+    }
+
   , validate: function () {
       if (!this.$element[0].parentNode) {
         this.hide()
@@ -236,8 +309,8 @@
     }
 
   , toggle: function (e) {
-      var self = $(e.currentTarget)[this.type](this._options).data(this.type)
-      self[self.tip().hasClass('in') ? 'hide' : 'show']()
+      var self = e ? $(e.currentTarget)[this.type](this._options).data(this.type) : this
+      self.tip().hasClass('in') ? self.hide() : self.show()
     }
 
   , destroy: function () {
@@ -269,10 +342,11 @@
   , placement: 'top'
   , selector: false
   , template: '<div class="tooltip"><div class="tooltip-arrow"></div><div class="tooltip-inner"></div></div>'
-  , trigger: 'hover'
+  , trigger: 'hover focus'
   , title: ''
   , delay: 0
   , html: false
+  , container: false
   }
 
 
@@ -285,4 +359,3 @@
   }
 
 }(window.jQuery);
-

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
new file mode 100644
index 0000000..5da9d63
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+.graph {
+  font: 10px sans-serif;
+}
+
+.axis path, .axis line {
+  fill: none;
+  stroke: gray;
+  shape-rendering: crispEdges;
+}
+
+.axis text {
+  fill: gray;
+}
+
+.tooltip-inner {
+  max-width: 500px !important; // Make sure we only have one line tooltip
+}
+
+.line {
+  fill: none;
+  stroke: #0088cc;
+  stroke-width: 1.5px;
+}
+
+.bar rect {
+  fill: #0088cc;
+  shape-rendering: crispEdges;
+}
+
+.bar rect:hover {
+  fill: #00c2ff;
+}
+
+.timeline {
+  width: 500px;
+}
+
+.histogram {
+  width: auto;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
new file mode 100644
index 0000000..a4e03b1
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+// timeFormat: StreamingPage.scala will generate a global "timeFormat" dictionary to store the time
+// and its formatted string. Because we cannot specify a timezone in JavaScript, to make sure the
+// server and client use the same timezone, we use the "timeFormat" dictionary to format all time
+// values used in the graphs.
+
+// A global margin left for all timeline graphs. It will be set in "registerTimeline". This will be
+// used to align all timeline graphs.
+var maxMarginLeftForTimeline = 0;
+
+// The max X values for all histograms. It will be set in "registerHistogram".
+var maxXForHistogram = 0;
+
+var histogramBinCount = 10;
+var yValueFormat = d3.format(",.2f");
+
+// Show a tooltip "text" for "node"
+function showBootstrapTooltip(node, text) {
+    $(node).tooltip({title: text, trigger: "manual", container: "body"});
+    $(node).tooltip("show");
+}
+
+// Hide the tooltip for "node"
+function hideBootstrapTooltip(node) {
+    $(node).tooltip("destroy");
+}
+
+// Register a timeline graph. All timeline graphs should be register before calling any
+// "drawTimeline" so that we can determine the max margin left for all timeline graphs.
+function registerTimeline(minY, maxY) {
+    var numOfChars = yValueFormat(maxY).length;
+    // A least width for "maxY" in the graph
+    var pxForMaxY = numOfChars * 8 + 10;
+    // Make sure we have enough space to show the ticks in the y axis of timeline
+    maxMarginLeftForTimeline = pxForMaxY > maxMarginLeftForTimeline? pxForMaxY : maxMarginLeftForTimeline;
+}
+
+// Register a histogram graph. All histogram graphs should be register before calling any
+// "drawHistogram" so that we can determine the max X value for histograms.
+function registerHistogram(values, minY, maxY) {
+    var data = d3.layout.histogram().range([minY, maxY]).bins(histogramBinCount)(values);
+    // d.x is the y values while d.y is the x values
+    var maxX = d3.max(data, function(d) { return d.y; });
+    maxXForHistogram = maxX > maxXForHistogram ? maxX : maxXForHistogram;
+}
+
+// Draw a line between (x1, y1) and (x2, y2)
+function drawLine(svg, xFunc, yFunc, x1, y1, x2, y2) {
+    var line = d3.svg.line()
+        .x(function(d) { return xFunc(d.x); })
+        .y(function(d) { return yFunc(d.y); });
+    var data = [{x: x1, y: y1}, {x: x2, y: y2}];
+    svg.append("path")
+        .datum(data)
+        .style("stroke-dasharray", ("6, 6"))
+        .style("stroke", "lightblue")
+        .attr("class", "line")
+        .attr("d", line);
+}
+
+/**
+ * @param id the `id` used in the html `div` tag
+ * @param data the data for the timeline graph
+ * @param minX the min value of X axis
+ * @param maxX the max value of X axis
+ * @param minY the min value of Y axis
+ * @param maxY the max value of Y axis
+ * @param unitY the unit of Y axis
+ * @param batchInterval if "batchInterval" is specified, we will draw a line for "batchInterval" in the graph
+ */
+function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) {
+    // Hide the right border of "<td>". We cannot use "css" directly, or "sorttable.js" will override them.
+    d3.select(d3.select(id).node().parentNode)
+        .style("padding", "8px 0 8px 8px")
+        .style("border-right", "0px solid white");
+
+    var margin = {top: 20, right: 27, bottom: 30, left: maxMarginLeftForTimeline};
+    var width = 500 - margin.left - margin.right;
+    var height = 150 - margin.top - margin.bottom;
+
+    var x = d3.scale.linear().domain([minX, maxX]).range([0, width]);
+    var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
+
+    var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { return timeFormat[d]; });
+    var formatYValue = d3.format(",.2f");
+    var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5).tickFormat(formatYValue);
+
+    var line = d3.svg.line()
+        .x(function(d) { return x(d.x); })
+        .y(function(d) { return y(d.y); });
+
+    var svg = d3.select(id).append("svg")
+        .attr("width", width + margin.left + margin.right)
+        .attr("height", height + margin.top + margin.bottom)
+        .append("g")
+            .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
+
+    // Only show the first and last time in the graph
+    xAxis.tickValues(x.domain());
+
+    svg.append("g")
+        .attr("class", "x axis")
+        .attr("transform", "translate(0," + height + ")")
+        .call(xAxis)
+
+    svg.append("g")
+        .attr("class", "y axis")
+        .call(yAxis)
+        .append("text")
+            .attr("transform", "translate(0," + (-3) + ")")
+            .text(unitY);
+
+
+    if (batchInterval && batchInterval <= maxY) {
+        drawLine(svg, x, y, minX, batchInterval, maxX, batchInterval);
+    }
+
+    svg.append("path")
+        .datum(data)
+        .attr("class", "line")
+        .attr("d", line);
+
+    // Add points to the line. However, we make it invisible at first. But when the user moves mouse
+    // over a point, it will be displayed with its detail.
+    svg.selectAll(".point")
+        .data(data)
+        .enter().append("circle")
+            .attr("stroke", "white") // white and opacity = 0 make it invisible
+            .attr("fill", "white")
+            .attr("opacity", "0")
+            .attr("cx", function(d) { return x(d.x); })
+            .attr("cy", function(d) { return y(d.y); })
+            .attr("r", function(d) { return 3; })
+            .on('mouseover', function(d) {
+                var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x];
+                showBootstrapTooltip(d3.select(this).node(), tip);
+                // show the point
+                d3.select(this)
+                    .attr("stroke", "steelblue")
+                    .attr("fill", "steelblue")
+                    .attr("opacity", "1");
+            })
+            .on('mouseout',  function() {
+                hideBootstrapTooltip(d3.select(this).node());
+                // hide the point
+                d3.select(this)
+                    .attr("stroke", "white")
+                    .attr("fill", "white")
+                    .attr("opacity", "0");
+            })
+            .on("click", function(d) {
+                window.location.href = "batch/?id=" + d.x;
+            });
+}
+
+/**
+ * @param id the `id` used in the html `div` tag
+ * @param values the data for the histogram graph
+ * @param minY the min value of Y axis
+ * @param maxY the max value of Y axis
+ * @param unitY the unit of Y axis
+ * @param batchInterval if "batchInterval" is specified, we will draw a line for "batchInterval" in the graph
+ */
+function drawHistogram(id, values, minY, maxY, unitY, batchInterval) {
+    // Hide the left border of "<td>". We cannot use "css" directly, or "sorttable.js" will override them.
+    d3.select(d3.select(id).node().parentNode)
+        .style("padding", "8px 8px 8px 0")
+        .style("border-left", "0px solid white");
+
+    var margin = {top: 20, right: 30, bottom: 30, left: 10};
+    var width = 300 - margin.left - margin.right;
+    var height = 150 - margin.top - margin.bottom;
+
+    var x = d3.scale.linear().domain([0, maxXForHistogram]).range([0, width]);
+    var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
+
+    var xAxis = d3.svg.axis().scale(x).orient("top").ticks(5);
+    var yAxis = d3.svg.axis().scale(y).orient("left").ticks(0).tickFormat(function(d) { return ""; });
+
+    var data = d3.layout.histogram().range([minY, maxY]).bins(histogramBinCount)(values);
+
+    var svg = d3.select(id).append("svg")
+        .attr("width", width + margin.left + margin.right)
+        .attr("height", height + margin.top + margin.bottom)
+        .append("g")
+            .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
+
+    if (batchInterval && batchInterval <= maxY) {
+        drawLine(svg, x, y, 0, batchInterval, maxXForHistogram, batchInterval);
+    }
+
+    svg.append("g")
+        .attr("class", "x axis")
+        .call(xAxis)
+
+    svg.append("g")
+        .attr("class", "y axis")
+        .call(yAxis)
+
+    var bar = svg.selectAll(".bar")
+        .data(data)
+        .enter()
+        .append("g")
+            .attr("transform", function(d) { return "translate(0," + (y(d.x) - height + y(d.dx))  + ")";})
+            .attr("class", "bar").append("rect")
+            .attr("width", function(d) { return x(d.y); })
+            .attr("height", function(d) { return height - y(d.dx); })
+            .on('mouseover', function(d) {
+                var percent = yValueFormat(d.y * 100.0 / values.length) + "%";
+                var tip = d.y + " batches (" + percent + ") between " + yValueFormat(d.x) + " and " + yValueFormat(d.x + d.dx) + " " + unitY;
+                showBootstrapTooltip(d3.select(this).node(), tip);
+            })
+            .on('mouseout',  function() {
+                hideBootstrapTooltip(d3.select(this).node());
+            });
+
+    if (batchInterval && batchInterval <= maxY) {
+        // Add the "stable" text to the graph below the batch interval line.
+        var stableXOffset = x(maxXForHistogram) - 20;
+        var stableYOffset = y(batchInterval) + 15;
+        svg.append("text")
+            .style("fill", "lightblue")
+            .attr("class", "stable-text")
+            .attr("text-anchor", "middle")
+            .attr("transform", "translate(" + stableXOffset + "," + stableYOffset + ")")
+            .text("stable")
+            .on('mouseover', function(d) {
+              var tip = "Processing Time <= Batch Interval (" + yValueFormat(batchInterval) +" " + unitY +")";
+              showBootstrapTooltip(d3.select(this).node(), tip);
+            })
+            .on('mouseout',  function() {
+              hideBootstrapTooltip(d3.select(this).node());
+            });
+    }
+}
+
+$(function() {
+    function getParameterFromURL(param)
+    {
+        var parameters = window.location.search.substring(1); // Remove "?"
+        var keyValues = parameters.split('&');
+        for (var i = 0; i < keyValues.length; i++)
+        {
+            var paramKeyValue = keyValues[i].split('=');
+            if (paramKeyValue[0] == param)
+            {
+                return paramKeyValue[1];
+            }
+        }
+    }
+
+    if (getParameterFromURL("show-streams-detail") == "true") {
+        // Show the details for all InputDStream
+        $('#inputs-table').toggle('collapsed');
+        $('#triangle').html('&#9660;');
+    }
+});

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 1751404..9c7f698 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -110,6 +110,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
       .toArray
   }
 
+  def getInputStreamName(streamId: Int): Option[String] = synchronized {
+    inputStreams.find(_.id == streamId).map(_.name)
+  }
+
   def generateJobs(time: Time): Seq[Job] = {
     logDebug("Generating jobs for time " + time)
     val jobs = this.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index e4ad4b5..9716adb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -45,6 +45,11 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
   val id = ssc.getNewInputStreamId()
 
   /**
+   * The name of this InputDStream. By default, it's the class name with its id.
+   */
+  private[streaming] def name: String = s"${getClass.getSimpleName}-$id"
+
+  /**
    * Checks whether the 'time' is valid wrt slideDuration for generating RDD.
    * Additionally it also ensures valid times are in strictly increasing order.
    * This ensures that InputDStream.compute() is called strictly on increasing

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
index 52f08b9..de85f24 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
@@ -32,6 +32,7 @@ case class ReceiverInfo(
     active: Boolean,
     location: String,
     lastErrorMessage: String = "",
-    lastError: String = ""
+    lastError: String = "",
+    lastErrorTime: Long = -1L
    ) {
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3c34139..f73f7e7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -155,10 +155,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
   private def deregisterReceiver(streamId: Int, message: String, error: String) {
     val newReceiverInfo = receiverInfo.get(streamId) match {
       case Some(oldInfo) =>
-        oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message, lastError = error)
+        val lastErrorTime =
+          if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
+        oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message,
+          lastError = error, lastErrorTime = lastErrorTime)
       case None =>
         logWarning("No prior receiver info")
-        ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
+        val lastErrorTime =
+          if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
+        ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
+          lastError = error, lastErrorTime = lastErrorTime)
     }
     receiverInfo -= streamId
     listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
@@ -182,7 +188,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
         oldInfo.copy(lastErrorMessage = message, lastError = error)
       case None =>
         logWarning("No prior receiver info")
-        ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
+        ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
+          lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
     }
     receiverInfo(streamId) = newReceiverInfo
     listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index e219e27..2960b52 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui
 
 import scala.xml.Node
 
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
 
 private[ui] abstract class BatchTableBase(tableId: String) {
 
@@ -32,12 +32,12 @@ private[ui] abstract class BatchTableBase(tableId: String) {
 
   protected def baseRow(batch: BatchUIData): Seq[Node] = {
     val batchTime = batch.batchTime.milliseconds
-    val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds)
+    val formattedBatchTime = SparkUIUtils.formatDate(batch.batchTime.milliseconds)
     val eventCount = batch.numRecords
     val schedulingDelay = batch.schedulingDelay
-    val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
+    val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
     val processingTime = batch.processingDelay
-    val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-")
+    val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
 
     <td sorttable_customkey={batchTime.toString}>
       <a href={s"batch?id=$batchTime"}>
@@ -107,7 +107,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
 
   private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
     val totalDelay = batch.totalDelay
-    val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-")
+    val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
     baseRow(batch) ++
       <td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
         {formattedTotalDelay}

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 2da9a29..3f1cab6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -24,7 +24,7 @@ import scala.xml.{NodeSeq, Node}
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.streaming.Time
-import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
 import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
 import org.apache.spark.ui.jobs.UIData.JobUIData
 
@@ -73,8 +73,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
       sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
       dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
       flatMap(info => info.failureReason).headOption.getOrElse("")
-    val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
-    val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
+    val formattedDuration = duration.map(d => SparkUIUtils.formatDuration(d)).getOrElse("-")
+    val detailUrl = s"${SparkUIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
 
     // In the first row, output op id and its information needs to be shown. In other rows, these
     // cells will be taken up due to "rowspan".
@@ -110,7 +110,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
       </td>
       <td class="progress-cell">
         {
-          UIUtils.makeProgressBar(
+          SparkUIUtils.makeProgressBar(
             started = sparkJob.numActiveTasks,
             completed = sparkJob.numCompletedTasks,
             failed = sparkJob.numFailedTasks,
@@ -135,7 +135,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
         // If any job does not finish, set "formattedOutputOpDuration" to "-"
         "-"
       } else {
-        UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+        SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
       }
     generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
       sparkJobs.tail.map { sparkJob =>
@@ -212,24 +212,24 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
       throw new IllegalArgumentException(s"Missing id parameter")
     }
-    val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
+    val formattedBatchTime = SparkUIUtils.formatDate(batchTime.milliseconds)
 
     val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
       throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
     }
 
     val formattedSchedulingDelay =
-      batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
+      batchUIData.schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
     val formattedProcessingTime =
-      batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
-    val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
+      batchUIData.processingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+    val formattedTotalDelay = batchUIData.totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
 
     val summary: NodeSeq =
       <div>
         <ul class="unstyled">
           <li>
             <strong>Batch Duration: </strong>
-            {UIUtils.formatDuration(streamingListener.batchDuration)}
+            {SparkUIUtils.formatDuration(streamingListener.batchDuration)}
           </li>
           <li>
             <strong>Input data size: </strong>
@@ -259,6 +259,6 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
 
     val content = summary ++ jobTable
 
-    UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
+    SparkUIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index 99e10d2..a5514df 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -26,7 +26,7 @@ private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobI
 
 private[ui] case class BatchUIData(
     val batchTime: Time,
-    val receiverNumRecords: Map[Int, Long],
+    val streamIdToNumRecords: Map[Int, Long],
     val submissionTime: Long,
     val processingStartTime: Option[Long],
     val processingEndTime: Option[Long],
@@ -58,7 +58,7 @@ private[ui] case class BatchUIData(
   /**
    * The number of recorders received by the receivers in this batch.
    */
-  def numRecords: Long = receiverNumRecords.map(_._2).sum
+  def numRecords: Long = streamIdToNumRecords.values.sum
 }
 
 private[ui] object BatchUIData {

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 24cbb2b..68e8ce9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -29,7 +29,6 @@ import org.apache.spark.streaming.scheduler._
 import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
 import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
 import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
-import org.apache.spark.util.Distribution
 
 
 private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -38,7 +37,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   private val waitingBatchUIData = new HashMap[Time, BatchUIData]
   private val runningBatchUIData = new HashMap[Time, BatchUIData]
   private val completedBatchUIData = new Queue[BatchUIData]
-  private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+  private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
   private var totalCompletedBatches = 0L
   private var totalReceivedRecords = 0L
   private var totalProcessedRecords = 0L
@@ -145,7 +144,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
     }
   }
 
-  def numReceivers: Int = ssc.graph.getReceiverInputStreams().size
+  def numReceivers: Int = synchronized {
+    receiverInfos.size
+  }
 
   def numTotalCompletedBatches: Long = synchronized {
     totalCompletedBatches
@@ -175,39 +176,42 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
     completedBatchUIData.toSeq
   }
 
-  def processingDelayDistribution: Option[Distribution] = synchronized {
-    extractDistribution(_.processingDelay)
+  def streamName(streamId: Int): Option[String] = {
+    ssc.graph.getInputStreamName(streamId)
   }
 
-  def schedulingDelayDistribution: Option[Distribution] = synchronized {
-    extractDistribution(_.schedulingDelay)
-  }
+  /**
+   * Return all InputDStream Ids
+   */
+  def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
 
-  def totalDelayDistribution: Option[Distribution] = synchronized {
-    extractDistribution(_.totalDelay)
-  }
-
-  def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
-    val latestBatchInfos = retainedBatches.reverse.take(batchUIDataLimit)
-    val latestReceiverNumRecords = latestBatchInfos.map(_.receiverNumRecords)
-    val streamIds = ssc.graph.getInputStreams().map(_.id)
-    streamIds.map { id =>
-      val recordsOfParticularReceiver =
-        latestReceiverNumRecords.map(v => v.getOrElse(id, 0L).toDouble * 1000 / batchDuration)
-      val distribution = Distribution(recordsOfParticularReceiver)
-      (id, distribution)
+  /**
+   * Return all of the event rates for each InputDStream in each batch. The key of the return value
+   * is the stream id, and the value is a sequence of batch time with its event rate.
+   */
+  def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
+    val _retainedBatches = retainedBatches
+    val latestBatches = _retainedBatches.map { batchUIData =>
+      (batchUIData.batchTime.milliseconds, batchUIData.streamIdToNumRecords)
+    }
+    streamIds.map { streamId =>
+      val eventRates = latestBatches.map {
+        case (batchTime, streamIdToNumRecords) =>
+          val numRecords = streamIdToNumRecords.getOrElse(streamId, 0L)
+          (batchTime, numRecords * 1000.0 / batchDuration)
+      }
+      (streamId, eventRates)
     }.toMap
   }
 
   def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
-    val lastReceiverNumRecords = lastReceivedBatch.map(_.receiverNumRecords)
-    val streamIds = ssc.graph.getInputStreams().map(_.id)
-    lastReceiverNumRecords.map { receiverNumRecords =>
-      streamIds.map { id =>
-        (id, receiverNumRecords.getOrElse(id, 0L))
+    val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.streamIdToNumRecords)
+    lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
+      streamIds.map { streamId =>
+        (streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))
       }.toMap
     }.getOrElse {
-      streamIds.map(id => (id, 0L)).toMap
+      streamIds.map(streamId => (streamId, 0L)).toMap
     }
   }
 
@@ -215,10 +219,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
     receiverInfos.get(receiverId)
   }
 
-  def receiverIds(): Iterable[Int] = synchronized {
-    receiverInfos.keys
-  }
-
   def lastCompletedBatch: Option[BatchUIData] = synchronized {
     completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption
   }
@@ -227,15 +227,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
     retainedBatches.lastOption
   }
 
-  private def retainedBatches: Seq[BatchUIData] = {
+  def retainedBatches: Seq[BatchUIData] = synchronized {
     (waitingBatchUIData.values.toSeq ++
       runningBatchUIData.values.toSeq ++ completedBatchUIData).sortBy(_.batchTime)(Time.ordering)
   }
 
-  private def extractDistribution(getMetric: BatchUIData => Option[Long]): Option[Distribution] = {
-    Distribution(completedBatchUIData.flatMap(getMetric(_)).map(_.toDouble))
-  }
-
   def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
     val batchUIData = waitingBatchUIData.get(batchTime).orElse {
       runningBatchUIData.get(batchTime).orElse {

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index db37ae8..ecbebe5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -17,181 +17,454 @@
 
 package org.apache.spark.streaming.ui
 
-import java.util.Calendar
+import java.text.SimpleDateFormat
+import java.util.Date
+import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.Node
+import scala.collection.mutable.ArrayBuffer
+import scala.xml.{Node, Unparsed}
 
 import org.apache.spark.Logging
 import org.apache.spark.ui._
-import org.apache.spark.ui.UIUtils._
-import org.apache.spark.util.Distribution
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
+
+/**
+ * A helper class to generate JavaScript and HTML for both timeline and histogram graphs.
+ *
+ * @param timelineDivId the timeline `id` used in the html `div` tag
+ * @param histogramDivId the timeline `id` used in the html `div` tag
+ * @param data the data for the graph
+ * @param minX the min value of X axis
+ * @param maxX the max value of X axis
+ * @param minY the min value of Y axis
+ * @param maxY the max value of Y axis
+ * @param unitY the unit of Y axis
+ * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in
+ *                      the graph
+ */
+private[ui] class GraphUIData(
+    timelineDivId: String,
+    histogramDivId: String,
+    data: Seq[(Long, Double)],
+    minX: Long,
+    maxX: Long,
+    minY: Double,
+    maxY: Double,
+    unitY: String,
+    batchInterval: Option[Double] = None) {
+
+  private var dataJavaScriptName: String = _
+
+  def generateDataJs(jsCollector: JsCollector): Unit = {
+    val jsForData = data.map { case (x, y) =>
+      s"""{"x": $x, "y": $y}"""
+    }.mkString("[", ",", "]")
+    dataJavaScriptName = jsCollector.nextVariableName
+    jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;")
+  }
+
+  def generateTimelineHtml(jsCollector: JsCollector): Seq[Node] = {
+    jsCollector.addPreparedStatement(s"registerTimeline($minY, $maxY);")
+    if (batchInterval.isDefined) {
+      jsCollector.addStatement(
+        "drawTimeline(" +
+          s"'#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY, '$unitY'," +
+          s" ${batchInterval.get}" +
+          ");")
+    } else {
+      jsCollector.addStatement(
+        s"drawTimeline('#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY," +
+          s" '$unitY');")
+    }
+    <div id={timelineDivId}></div>
+  }
+
+  def generateHistogramHtml(jsCollector: JsCollector): Seq[Node] = {
+    val histogramData = s"$dataJavaScriptName.map(function(d) { return d.y; })"
+    jsCollector.addPreparedStatement(s"registerHistogram($histogramData, $minY, $maxY);")
+    if (batchInterval.isDefined) {
+      jsCollector.addStatement(
+        "drawHistogram(" +
+          s"'#$histogramDivId', $histogramData, $minY, $maxY, '$unitY', ${batchInterval.get}" +
+          ");")
+    } else {
+      jsCollector.addStatement(
+        s"drawHistogram('#$histogramDivId', $histogramData, $minY, $maxY, '$unitY');")
+    }
+    <div id={histogramDivId}></div>
+  }
+}
+
+/**
+ * A helper class for "scheduling delay", "processing time" and "total delay" to generate data that
+ * will be used in the timeline and histogram graphs.
+ *
+ * @param data (batchTime, milliseconds). "milliseconds" is something like "processing time".
+ */
+private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
+
+  /**
+   * Converting the original data as per `unit`.
+   */
+  def timelineData(unit: TimeUnit): Seq[(Long, Double)] =
+    data.map(x => x._1 -> UIUtils.convertToTimeUnit(x._2, unit))
+
+  /**
+   * Converting the original data as per `unit`.
+   */
+  def histogramData(unit: TimeUnit): Seq[Double] =
+    data.map(x => UIUtils.convertToTimeUnit(x._2, unit))
+
+  val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
+
+  val formattedAvg: String = StreamingPage.formatDurationOption(avg)
+
+  val max: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).max)
+}
+
+/**
+ * A helper class for "input rate" to generate data that will be used in the timeline and histogram
+ * graphs.
+ *
+ * @param data (batchTime, event-rate).
+ */
+private[ui] class EventRateUIData(val data: Seq[(Long, Double)]) {
+
+  val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
+
+  val formattedAvg: String = avg.map(_.formatted("%.2f")).getOrElse("-")
+
+  val max: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).max)
+}
 
 /** Page for Spark Web UI that shows statistics of a streaming job */
 private[ui] class StreamingPage(parent: StreamingTab)
   extends WebUIPage("") with Logging {
 
+  import StreamingPage._
+
   private val listener = parent.listener
   private val startTime = System.currentTimeMillis()
-  private val emptyCell = "-"
 
   /** Render the page */
   def render(request: HttpServletRequest): Seq[Node] = {
-    val content = listener.synchronized {
-      generateBasicStats() ++ <br></br> ++
-      <h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
-      generateReceiverStats() ++
-      generateBatchStatsTable() ++
-      generateBatchListTables()
-    }
-    UIUtils.headerSparkPage("Streaming", content, parent, Some(5000))
+    val resources = generateLoadResources()
+    val basicInfo = generateBasicInfo()
+    val content = resources ++
+      basicInfo ++
+      listener.synchronized {
+        generateStatTable() ++
+          generateBatchListTables()
+      }
+    SparkUIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000))
   }
 
-  /** Generate basic stats of the streaming program */
-  private def generateBasicStats(): Seq[Node] = {
-    val timeSinceStart = System.currentTimeMillis() - startTime
+  /**
+   * Generate html that will load css/js files for StreamingPage
+   */
+  private def generateLoadResources(): Seq[Node] = {
     // scalastyle:off
-    <ul class ="unstyled">
-      <li>
-        <strong>Started at: </strong> {UIUtils.formatDate(startTime)}
-      </li>
-      <li>
-        <strong>Time since start: </strong>{formatDurationVerbose(timeSinceStart)}
-      </li>
-      <li>
-        <strong>Network receivers: </strong>{listener.numReceivers}
-      </li>
-      <li>
-        <strong>Batch interval: </strong>{formatDurationVerbose(listener.batchDuration)}
-      </li>
-      <li>
-        <a href="#completed"><strong>Completed batches: </strong></a>{listener.numTotalCompletedBatches}
-      </li>
-      <li>
-        <a href="#active"><strong>Active batches: </strong></a>{listener.numUnprocessedBatches}
-      </li>
-      <li>
-        <strong>Received events: </strong>{listener.numTotalReceivedRecords}
-      </li>
-      <li>
-        <strong>Processed events: </strong>{listener.numTotalProcessedRecords}
-      </li>
-    </ul>
+    <script src={SparkUIUtils.prependBaseUri("/static/d3.min.js")}></script>
+      <link rel="stylesheet" href={SparkUIUtils.prependBaseUri("/static/streaming-page.css")} type="text/css"/>
+      <script src={SparkUIUtils.prependBaseUri("/static/streaming-page.js")}></script>
     // scalastyle:on
   }
 
-  /** Generate stats of data received by the receivers in the streaming program */
-  private def generateReceiverStats(): Seq[Node] = {
-    val receivedRecordDistributions = listener.receivedRecordsDistributions
-    val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
-    val table = if (receivedRecordDistributions.size > 0) {
-      val headerRow = Seq(
-        "Receiver",
-        "Status",
-        "Location",
-        "Events in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]",
-        "Minimum rate\n[events/sec]",
-        "Median rate\n[events/sec]",
-        "Maximum rate\n[events/sec]",
-        "Last Error"
-      )
-      val dataRows = listener.receiverIds().map { receiverId =>
-        val receiverInfo = listener.receiverInfo(receiverId)
-        val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
-        val receiverActive = receiverInfo.map { info =>
-          if (info.active) "ACTIVE" else "INACTIVE"
-        }.getOrElse(emptyCell)
-        val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
-        val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId))
-        val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
-          d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong))
-        }.getOrElse {
-          Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
-        }
-        val receiverLastError = listener.receiverInfo(receiverId).map { info =>
-          val msg = s"${info.lastErrorMessage} - ${info.lastError}"
-          if (msg.size > 100) msg.take(97) + "..." else msg
-        }.getOrElse(emptyCell)
-        Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++
-          receivedRecordStats ++ Seq(receiverLastError)
-      }.toSeq
-      Some(listingTable(headerRow, dataRows))
-    } else {
-      None
-    }
+  /** Generate basic information of the streaming program */
+  private def generateBasicInfo(): Seq[Node] = {
+    val timeSinceStart = System.currentTimeMillis() - startTime
+    <div>Running batches of
+      <strong>
+        {SparkUIUtils.formatDurationVerbose(listener.batchDuration)}
+      </strong>
+      for
+      <strong>
+        {SparkUIUtils.formatDurationVerbose(timeSinceStart)}
+      </strong>
+      since
+      <strong>
+        {SparkUIUtils.formatDate(startTime)}
+      </strong>
+    </div>
+    <br />
+  }
 
-    val content =
-      <h5>Receiver Statistics</h5> ++
-      <div>{table.getOrElse("No receivers")}</div>
+  /**
+   * Generate a global "timeFormat" dictionary in the JavaScript to store the time and its formatted
+   * string. Because we cannot specify a timezone in JavaScript, to make sure the server and client
+   * use the same timezone, we use the "timeFormat" dictionary to format all time values used in the
+   * graphs.
+   *
+   * @param times all time values that will be used in the graphs.
+   */
+  private def generateTimeMap(times: Seq[Long]): Seq[Node] = {
+    val dateFormat = new SimpleDateFormat("HH:mm:ss")
+    val js = "var timeFormat = {};\n" + times.map { time =>
+      val formattedTime = dateFormat.format(new Date(time))
+      s"timeFormat[$time] = '$formattedTime';"
+    }.mkString("\n")
 
-    content
+    <script>{Unparsed(js)}</script>
   }
 
-  /** Generate stats of batch jobs of the streaming program */
-  private def generateBatchStatsTable(): Seq[Node] = {
-    val numBatches = listener.retainedCompletedBatches.size
-    val lastCompletedBatch = listener.lastCompletedBatch
-    val table = if (numBatches > 0) {
-      val processingDelayQuantilesRow = {
-        Seq(
-          "Processing Time",
-          formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay))
-        ) ++ getQuantiles(listener.processingDelayDistribution)
-      }
-      val schedulingDelayQuantilesRow = {
-        Seq(
-          "Scheduling Delay",
-          formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay))
-        ) ++ getQuantiles(listener.schedulingDelayDistribution)
-      }
-      val totalDelayQuantilesRow = {
-        Seq(
-          "Total Delay",
-          formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay))
-        ) ++ getQuantiles(listener.totalDelayDistribution)
-      }
-      val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
-        "Median", "75th percentile", "Maximum")
-      val dataRows: Seq[Seq[String]] = Seq(
-        processingDelayQuantilesRow,
-        schedulingDelayQuantilesRow,
-        totalDelayQuantilesRow
-      )
-      Some(listingTable(headerRow, dataRows))
-    } else {
-      None
-    }
+  private def generateStatTable(): Seq[Node] = {
+    val batches = listener.retainedBatches
 
-    val content =
-      <h5>Batch Processing Statistics</h5> ++
-      <div>
-        <ul class="unstyled">
-          {table.getOrElse("No statistics have been generated yet.")}
-        </ul>
-      </div>
+    val batchTimes = batches.map(_.batchTime.milliseconds)
+    val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
+    val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
 
-    content
-  }
+    val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo =>
+      (batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
+    })
 
+    val schedulingDelay = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
+      batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _)
+    })
+    val processingTime = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
+      batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _)
+    })
+    val totalDelay = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
+      batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _)
+    })
 
-  /**
-   * Returns a human-readable string representing a duration such as "5 second 35 ms"
-   */
-  private def formatDurationOption(msOption: Option[Long]): String = {
-    msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+    // Use the max value of "schedulingDelay", "processingTime", and "totalDelay" to make the
+    // Y axis ranges same.
+    val _maxTime =
+      (for (m1 <- schedulingDelay.max; m2 <- processingTime.max; m3 <- totalDelay.max) yield
+        m1 max m2 max m3).getOrElse(0L)
+    // Should start at 0
+    val minTime = 0L
+    val (maxTime, normalizedUnit) = UIUtils.normalizeDuration(_maxTime)
+    val formattedUnit = UIUtils.shortTimeUnitString(normalizedUnit)
+
+    // Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same.
+    // If it's not an integral number, just use its ceil integral number.
+    val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
+    val minEventRate = 0L
+
+    // JavaScript to show/hide the InputDStreams sub table.
+    val triangleJs =
+      s"""$$('#inputs-table').toggle('collapsed');
+         |var status = false;
+         |if ($$(this).html() == '$BLACK_RIGHT_TRIANGLE_HTML') {
+         |$$(this).html('$BLACK_DOWN_TRIANGLE_HTML');status = true;}
+         |else {$$(this).html('$BLACK_RIGHT_TRIANGLE_HTML');status  = false;}
+         |window.history.pushState('',
+         |    document.title, window.location.pathname + '?show-streams-detail=' + status);"""
+        .stripMargin.replaceAll("\\n", "") // it must be only one single line
+
+    val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit)
+
+    val jsCollector = new JsCollector
+
+    val graphUIDataForEventRateOfAllStreams =
+      new GraphUIData(
+        "all-stream-events-timeline",
+        "all-stream-events-histogram",
+        eventRateForAllStreams.data,
+        minBatchTime,
+        maxBatchTime,
+        minEventRate,
+        maxEventRate,
+        "events/sec")
+    graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector)
+
+    val graphUIDataForSchedulingDelay =
+      new GraphUIData(
+        "scheduling-delay-timeline",
+        "scheduling-delay-histogram",
+        schedulingDelay.timelineData(normalizedUnit),
+        minBatchTime,
+        maxBatchTime,
+        minTime,
+        maxTime,
+        formattedUnit)
+    graphUIDataForSchedulingDelay.generateDataJs(jsCollector)
+
+    val graphUIDataForProcessingTime =
+      new GraphUIData(
+        "processing-time-timeline",
+        "processing-time-histogram",
+        processingTime.timelineData(normalizedUnit),
+        minBatchTime,
+        maxBatchTime,
+        minTime,
+        maxTime,
+        formattedUnit, Some(batchInterval))
+    graphUIDataForProcessingTime.generateDataJs(jsCollector)
+
+    val graphUIDataForTotalDelay =
+      new GraphUIData(
+        "total-delay-timeline",
+        "total-delay-histogram",
+        totalDelay.timelineData(normalizedUnit),
+        minBatchTime,
+        maxBatchTime,
+        minTime,
+        maxTime,
+        formattedUnit)
+    graphUIDataForTotalDelay.generateDataJs(jsCollector)
+
+    // It's false before the user registers the first InputDStream
+    val hasStream = listener.streamIds.nonEmpty
+
+    val numCompletedBatches = listener.retainedCompletedBatches.size
+    val numActiveBatches = batchTimes.length - numCompletedBatches
+    val table =
+      // scalastyle:off
+      <table id="stat-table" class="table table-bordered" style="width: auto">
+      <thead>
+        <tr>
+          <th style="width: 160px;"></th>
+          <th style="width: 492px;">Timelines (Last {batchTimes.length} batches, {numActiveBatches} active, {numCompletedBatches} completed)</th>
+          <th style="width: 300px;">Histograms</th></tr>
+      </thead>
+      <tbody>
+        <tr>
+          <td style="vertical-align: middle;">
+            <div style="width: 160px;">
+              <div>
+              {if (hasStream) {
+                <span id="triangle" onclick={Unparsed(triangleJs)}>{Unparsed(BLACK_RIGHT_TRIANGLE_HTML)}</span>
+              }}
+                <strong>Input Rate</strong>
+              </div>
+              <div>Avg: {eventRateForAllStreams.formattedAvg} events/sec</div>
+            </div>
+          </td>
+          <td class="timeline">{graphUIDataForEventRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
+          <td class="histogram">{graphUIDataForEventRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
+        </tr>
+      {if (hasStream) {
+        <tr id="inputs-table" style="display: none;" >
+          <td colspan="3">
+            {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)}
+          </td>
+        </tr>
+      }}
+        <tr>
+          <td style="vertical-align: middle;">
+            <div style="width: 160px;">
+              <div><strong>Scheduling Delay</strong></div>
+              <div>Avg: {schedulingDelay.formattedAvg}</div>
+            </div>
+          </td>
+          <td class="timeline">{graphUIDataForSchedulingDelay.generateTimelineHtml(jsCollector)}</td>
+          <td class="histogram">{graphUIDataForSchedulingDelay.generateHistogramHtml(jsCollector)}</td>
+        </tr>
+        <tr>
+          <td style="vertical-align: middle;">
+            <div style="width: 160px;">
+              <div><strong>Processing Time</strong></div>
+              <div>Avg: {processingTime.formattedAvg}</div>
+            </div>
+          </td>
+          <td class="timeline">{graphUIDataForProcessingTime.generateTimelineHtml(jsCollector)}</td>
+          <td class="histogram">{graphUIDataForProcessingTime.generateHistogramHtml(jsCollector)}</td>
+        </tr>
+        <tr>
+          <td style="vertical-align: middle;">
+            <div style="width: 160px;">
+              <div><strong>Total Delay</strong></div>
+              <div>Avg: {totalDelay.formattedAvg}</div>
+            </div>
+          </td>
+          <td class="timeline">{graphUIDataForTotalDelay.generateTimelineHtml(jsCollector)}</td>
+          <td class="histogram">{graphUIDataForTotalDelay.generateHistogramHtml(jsCollector)}</td>
+        </tr>
+      </tbody>
+    </table>
+    // scalastyle:on
+
+    generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml
   }
 
-  /** Get quantiles for any time distribution */
-  private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
-    timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) }
+  private def generateInputDStreamsTable(
+      jsCollector: JsCollector,
+      minX: Long,
+      maxX: Long,
+      minY: Double,
+      maxY: Double): Seq[Node] = {
+    val content = listener.receivedEventRateWithBatchTime.map { case (streamId, eventRates) =>
+      generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxY)
+    }.foldLeft[Seq[Node]](Nil)(_ ++ _)
+
+    // scalastyle:off
+    <table class="table table-bordered" style="width: auto">
+      <thead>
+        <tr>
+          <th style="width: 151px;"></th>
+          <th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Status</div></th>
+          <th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Location</div></th>
+          <th style="width: 166px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Last Error Time</div></th>
+          <th>Last Error Message</th>
+        </tr>
+      </thead>
+      <tbody>
+        {content}
+      </tbody>
+    </table>
+    // scalastyle:on
   }
 
-  /** Generate HTML table from string data */
-  private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = {
-    def generateDataRow(data: Seq[String]): Seq[Node] = {
-      <tr> {data.map(d => <td>{d}</td>)} </tr>
-    }
-    UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true)
+  private def generateInputDStreamRow(
+      jsCollector: JsCollector,
+      streamId: Int,
+      eventRates: Seq[(Long, Double)],
+      minX: Long,
+      maxX: Long,
+      minY: Double,
+      maxY: Double): Seq[Node] = {
+    // If this is a ReceiverInputDStream, we need to show the receiver info. Or we only need the
+    // InputDStream name.
+    val receiverInfo = listener.receiverInfo(streamId)
+    val receiverName = receiverInfo.map(_.name).
+      orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId")
+    val receiverActive = receiverInfo.map { info =>
+      if (info.active) "ACTIVE" else "INACTIVE"
+    }.getOrElse(emptyCell)
+    val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
+    val receiverLastError = receiverInfo.map { info =>
+      val msg = s"${info.lastErrorMessage} - ${info.lastError}"
+      if (msg.size > 100) msg.take(97) + "..." else msg
+    }.getOrElse(emptyCell)
+    val receiverLastErrorTime = receiverInfo.map {
+      r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime)
+    }.getOrElse(emptyCell)
+    val receivedRecords = new EventRateUIData(eventRates)
+
+    val graphUIDataForEventRate =
+      new GraphUIData(
+        s"stream-$streamId-events-timeline",
+        s"stream-$streamId-events-histogram",
+        receivedRecords.data,
+        minX,
+        maxX,
+        minY,
+        maxY,
+        "events/sec")
+    graphUIDataForEventRate.generateDataJs(jsCollector)
+
+    <tr>
+      <td rowspan="2" style="vertical-align: middle; width: 151px;">
+        <div style="width: 151px;">
+          <div><strong>{receiverName}</strong></div>
+          <div>Avg: {receivedRecords.formattedAvg} events/sec</div>
+        </div>
+      </td>
+      <td>{receiverActive}</td>
+      <td>{receiverLocation}</td>
+      <td>{receiverLastErrorTime}</td>
+      <td><div style="width: 292px;">{receiverLastError}</div></td>
+    </tr>
+    <tr>
+      <td colspan="3" class="timeline">
+        {graphUIDataForEventRate.generateTimelineHtml(jsCollector)}
+      </td>
+      <td class="histogram">{graphUIDataForEventRate.generateHistogramHtml(jsCollector)}</td>
+    </tr>
   }
 
   private def generateBatchListTables(): Seq[Node] = {
@@ -216,3 +489,67 @@ private[ui] class StreamingPage(parent: StreamingTab)
   }
 }
 
+private[ui] object StreamingPage {
+  val BLACK_RIGHT_TRIANGLE_HTML = "&#9654;"
+  val BLACK_DOWN_TRIANGLE_HTML = "&#9660;"
+
+  val emptyCell = "-"
+
+  /**
+   * Returns a human-readable string representing a duration such as "5 second 35 ms"
+   */
+  def formatDurationOption(msOption: Option[Long]): String = {
+    msOption.map(SparkUIUtils.formatDurationVerbose).getOrElse(emptyCell)
+  }
+
+}
+
+/**
+ * A helper class that allows the user to add JavaScript statements which will be executed when the
+ * DOM has finished loading.
+ */
+private[ui] class JsCollector {
+
+  private var variableId = 0
+
+  /**
+   * Return the next unused JavaScript variable name
+   */
+  def nextVariableName: String = {
+    variableId += 1
+    "v" + variableId
+  }
+
+  /**
+   * JavaScript statements that will execute before `statements`
+   */
+  private val preparedStatements = ArrayBuffer[String]()
+
+  /**
+   * JavaScript statements that will execute after `preparedStatements`
+   */
+  private val statements = ArrayBuffer[String]()
+
+  def addPreparedStatement(js: String): Unit = {
+    preparedStatements += js
+  }
+
+  def addStatement(js: String): Unit = {
+    statements += js
+  }
+
+  /**
+   * Generate a html snippet that will execute all scripts when the DOM has finished loading.
+   */
+  def toHtml: Seq[Node] = {
+    val js =
+      s"""
+         |$$(document).ready(function(){
+         |    ${preparedStatements.mkString("\n")}
+         |    ${statements.mkString("\n")}
+         |});""".stripMargin
+
+   <script>{Unparsed(js)}</script>
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
new file mode 100644
index 0000000..c206f97
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import java.util.concurrent.TimeUnit
+
+object UIUtils {
+
+  /**
+   * Return the short string for a `TimeUnit`.
+   */
+  def shortTimeUnitString(unit: TimeUnit): String = unit match {
+    case TimeUnit.NANOSECONDS => "ns"
+    case TimeUnit.MICROSECONDS => "us"
+    case TimeUnit.MILLISECONDS => "ms"
+    case TimeUnit.SECONDS => "sec"
+    case TimeUnit.MINUTES => "min"
+    case TimeUnit.HOURS => "hrs"
+    case TimeUnit.DAYS => "days"
+  }
+
+  /**
+   * Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
+   * after converting, also with its TimeUnit.
+   */
+  def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
+    if (milliseconds < 1000) {
+      return (milliseconds, TimeUnit.MILLISECONDS)
+    }
+    val seconds = milliseconds.toDouble / 1000
+    if (seconds < 60) {
+      return (seconds, TimeUnit.SECONDS)
+    }
+    val minutes = seconds / 60
+    if (minutes < 60) {
+      return (minutes, TimeUnit.MINUTES)
+    }
+    val hours = minutes / 60
+    if (hours < 24) {
+      return (hours, TimeUnit.HOURS)
+    }
+    val days = hours / 24
+    (days, TimeUnit.DAYS)
+  }
+
+  /**
+   * Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
+   * will discard the fractional part.
+   */
+  def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double =  unit match {
+    case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
+    case TimeUnit.MICROSECONDS => milliseconds * 1000
+    case TimeUnit.MILLISECONDS => milliseconds
+    case TimeUnit.SECONDS => milliseconds / 1000.0
+    case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
+    case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
+    case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 8de43ba..2211f62 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -94,19 +94,34 @@ class UISeleniumSuite
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
         // check whether streaming page exists
         go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
-        val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
-        statisticText should contain("Network receivers:")
-        statisticText should contain("Batch interval:")
-
+        val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
+        h3Text should contain("Streaming Statistics")
+
+        // Check stat table
+        val statTableHeaders = findAll(cssSelector("#stat-table th")).map(_.text).toSeq
+        statTableHeaders.exists(
+          _.matches("Timelines \\(Last \\d+ batches, \\d+ active, \\d+ completed\\)")) should be
+          (true)
+        statTableHeaders should contain ("Histograms")
+
+        val statTableCells = findAll(cssSelector("#stat-table td")).map(_.text).toSeq
+        statTableCells.exists(_.contains("Input Rate")) should be (true)
+        statTableCells.exists(_.contains("Scheduling Delay")) should be (true)
+        statTableCells.exists(_.contains("Processing Time")) should be (true)
+        statTableCells.exists(_.contains("Total Delay")) should be (true)
+
+        // Check batch tables
         val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
         h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
         h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
 
         findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
-          List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status")
+          List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time",
+            "Status")
         }
         findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
-          List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Total Delay")
+          List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time",
+            "Total Delay")
         }
 
         val batchLinks =
@@ -176,9 +191,8 @@ class UISeleniumSuite
 
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
         go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
-        val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
-        statisticText should not contain ("Network receivers:")
-        statisticText should not contain ("Batch interval:")
+        val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
+        h3Text should not contain("Streaming Statistics")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index e874536..2a0f458 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -94,7 +94,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
     batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
     batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
     batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
-    batchUIData.get.receiverNumRecords should be (Map(0 -> 300L, 1 -> 300L))
+    batchUIData.get.streamIdToNumRecords should be (Map(0 -> 300L, 1 -> 300L))
     batchUIData.get.numRecords should be(600)
     batchUIData.get.outputOpIdSparkJobIdPairs should be
       Seq(OutputOpIdAndSparkJobId(0, 0),
@@ -138,7 +138,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
 
   test("Remove the old completed batches when exceeding the limit") {
     val ssc = setupStreams(input, operation)
-    val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+    val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
     val listener = new StreamingJobProgressListener(ssc)
 
     val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
@@ -155,7 +155,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
 
   test("out-of-order onJobStart and onBatchXXX") {
     val ssc = setupStreams(input, operation)
-    val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+    val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
     val listener = new StreamingJobProgressListener(ssc)
 
     // fulfill completedBatchInfos
@@ -182,7 +182,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
     batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
     batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
     batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
-    batchUIData.get.receiverNumRecords should be (Map.empty)
+    batchUIData.get.streamIdToNumRecords should be (Map.empty)
     batchUIData.get.numRecords should be (0)
     batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
 
@@ -203,4 +203,48 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
       (listener.waitingBatches.size + listener.runningBatches.size +
         listener.retainedCompletedBatches.size + 10)
   }
+
+  test("detect memory leak") {
+    val ssc = setupStreams(input, operation)
+    val listener = new StreamingJobProgressListener(ssc)
+
+    val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
+
+    for (_ <- 0 until 2 * limit) {
+      val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
+
+      // onBatchSubmitted
+      val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, None, None)
+      listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
+
+      // onBatchStarted
+      val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+      listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
+
+      // onJobStart
+      val jobStart1 = createJobStart(Time(1000), outputOpId = 0, jobId = 0)
+      listener.onJobStart(jobStart1)
+
+      val jobStart2 = createJobStart(Time(1000), outputOpId = 0, jobId = 1)
+      listener.onJobStart(jobStart2)
+
+      val jobStart3 = createJobStart(Time(1000), outputOpId = 1, jobId = 0)
+      listener.onJobStart(jobStart3)
+
+      val jobStart4 = createJobStart(Time(1000), outputOpId = 1, jobId = 1)
+      listener.onJobStart(jobStart4)
+
+      // onBatchCompleted
+      val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+      listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+    }
+
+    listener.waitingBatches.size should be (0)
+    listener.runningBatches.size should be (0)
+    listener.retainedCompletedBatches.size should be (limit)
+    listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <=
+      (listener.waitingBatches.size + listener.runningBatches.size +
+        listener.retainedCompletedBatches.size + 10)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/489700c8/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
new file mode 100644
index 0000000..6df1a63
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import java.util.concurrent.TimeUnit
+
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+
+class UIUtilsSuite extends FunSuite with Matchers{
+
+  test("shortTimeUnitString") {
+    assert("ns" === UIUtils.shortTimeUnitString(TimeUnit.NANOSECONDS))
+    assert("us" === UIUtils.shortTimeUnitString(TimeUnit.MICROSECONDS))
+    assert("ms" === UIUtils.shortTimeUnitString(TimeUnit.MILLISECONDS))
+    assert("sec" === UIUtils.shortTimeUnitString(TimeUnit.SECONDS))
+    assert("min" === UIUtils.shortTimeUnitString(TimeUnit.MINUTES))
+    assert("hrs" === UIUtils.shortTimeUnitString(TimeUnit.HOURS))
+    assert("days" === UIUtils.shortTimeUnitString(TimeUnit.DAYS))
+  }
+
+  test("normalizeDuration") {
+    verifyNormalizedTime(900, TimeUnit.MILLISECONDS, 900)
+    verifyNormalizedTime(1.0, TimeUnit.SECONDS, 1000)
+    verifyNormalizedTime(1.0, TimeUnit.MINUTES, 60 * 1000)
+    verifyNormalizedTime(1.0, TimeUnit.HOURS, 60 * 60 * 1000)
+    verifyNormalizedTime(1.0, TimeUnit.DAYS, 24 * 60 * 60 * 1000)
+  }
+
+  private def verifyNormalizedTime(
+      expectedTime: Double, expectedUnit: TimeUnit, input: Long): Unit = {
+    val (time, unit) = UIUtils.normalizeDuration(input)
+    time should be (expectedTime +- 1E-6)
+    unit should be (expectedUnit)
+  }
+
+  test("convertToTimeUnit") {
+    verifyConvertToTimeUnit(60.0 * 1000 * 1000 * 1000, 60 * 1000, TimeUnit.NANOSECONDS)
+    verifyConvertToTimeUnit(60.0 * 1000 * 1000, 60 * 1000, TimeUnit.MICROSECONDS)
+    verifyConvertToTimeUnit(60 * 1000, 60 * 1000, TimeUnit.MILLISECONDS)
+    verifyConvertToTimeUnit(60, 60 * 1000, TimeUnit.SECONDS)
+    verifyConvertToTimeUnit(1, 60 * 1000, TimeUnit.MINUTES)
+    verifyConvertToTimeUnit(1.0 / 60, 60 * 1000, TimeUnit.HOURS)
+    verifyConvertToTimeUnit(1.0 / 60 / 24, 60 * 1000, TimeUnit.DAYS)
+  }
+
+  private def verifyConvertToTimeUnit(
+      expectedTime: Double, milliseconds: Long, unit: TimeUnit): Unit = {
+    val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit)
+    convertedTime should be (expectedTime +- 1E-6)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org