You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/04/02 00:24:57 UTC

spark git commit: [SPARK-12857][STREAMING] Standardize "records" and "events" on "records"

Repository: spark
Updated Branches:
  refs/heads/master c16a39688 -> 19f32f2d9


[SPARK-12857][STREAMING] Standardize "records" and "events" on "records"

## What changes were proposed in this pull request?

Currently the Streaming tab in web UI uses records and events interchangeably; this PR tries to standardize them on "records".

"records" is chosen over "events" because:
- "records" is used extensively throughout the streaming documents, codes, and comments
- "events" is used only in Streaming UI related codes and comments

## How was this patch tested?

- existing test suites
- manually checking on the Streaming UI tab

Author: Liwei Lin <lw...@gmail.com>

Closes #12032 from lw-lin/streaming-events-to-records.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19f32f2d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19f32f2d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19f32f2d

Branch: refs/heads/master
Commit: 19f32f2d99c3620c0e562a98f7890316ddad1de9
Parents: c16a396
Author: Liwei Lin <lw...@gmail.com>
Authored: Fri Apr 1 15:26:22 2016 -0700
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Apr 1 15:26:22 2016 -0700

----------------------------------------------------------------------
 .../spark/streaming/receiver/RateLimiter.scala  |  2 +-
 .../spark/streaming/ui/AllBatchesTable.scala    |  4 +-
 .../ui/StreamingJobProgressListener.scala       | 10 +--
 .../spark/streaming/ui/StreamingPage.scala      | 65 ++++++++++----------
 4 files changed, 41 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/19f32f2d/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index b218910..0a861f2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -52,7 +52,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
    * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
    * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
    *
-   * @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
+   * @param newRate A new rate in records per second. It has no effect if it's 0 or negative.
    */
   private[receiver] def updateRate(newRate: Long): Unit =
     if (newRate > 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/19f32f2d/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index d339723..c024b4e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -52,7 +52,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
   protected def baseRow(batch: BatchUIData): Seq[Node] = {
     val batchTime = batch.batchTime.milliseconds
     val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
-    val eventCount = batch.numRecords
+    val numRecords = batch.numRecords
     val schedulingDelay = batch.schedulingDelay
     val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
     val processingTime = batch.processingDelay
@@ -65,7 +65,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
         {formattedBatchTime}
       </a>
     </td>
-      <td sorttable_customkey={eventCount.toString}>{eventCount.toString} events</td>
+      <td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
       <td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
         {formattedSchedulingDelay}
       </td>

http://git-wip-us.apache.org/repos/asf/spark/blob/19f32f2d/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index d6fcc58..6985c37 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -202,21 +202,21 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
   def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
 
   /**
-   * Return all of the event rates for each InputDStream in each batch. The key of the return value
-   * is the stream id, and the value is a sequence of batch time with its event rate.
+   * Return all of the record rates for each InputDStream in each batch. The key of the return value
+   * is the stream id, and the value is a sequence of batch time with its record rate.
    */
-  def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
+  def receivedRecordRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
     val _retainedBatches = retainedBatches
     val latestBatches = _retainedBatches.map { batchUIData =>
       (batchUIData.batchTime.milliseconds, batchUIData.streamIdToInputInfo.mapValues(_.numRecords))
     }
     streamIds.map { streamId =>
-      val eventRates = latestBatches.map {
+      val recordRates = latestBatches.map {
         case (batchTime, streamIdToNumRecords) =>
           val numRecords = streamIdToNumRecords.getOrElse(streamId, 0L)
           (batchTime, numRecords * 1000.0 / batchDuration)
       }
-      (streamId, eventRates)
+      (streamId, recordRates)
     }.toMap
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/19f32f2d/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index fa40436..b97e24f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -125,9 +125,9 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
  * A helper class for "input rate" to generate data that will be used in the timeline and histogram
  * graphs.
  *
- * @param data (batchTime, event-rate).
+ * @param data (batch time, record rate).
  */
-private[ui] class EventRateUIData(val data: Seq[(Long, Double)]) {
+private[ui] class RecordRateUIData(val data: Seq[(Long, Double)]) {
 
   val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
 
@@ -215,7 +215,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
     val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
     val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
 
-    val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo =>
+    val recordRateForAllStreams = new RecordRateUIData(batches.map { batchInfo =>
       (batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
     })
 
@@ -241,24 +241,24 @@ private[ui] class StreamingPage(parent: StreamingTab)
 
     // Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same.
     // If it's not an integral number, just use its ceil integral number.
-    val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
-    val minEventRate = 0L
+    val maxRecordRate = recordRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
+    val minRecordRate = 0L
 
     val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit)
 
     val jsCollector = new JsCollector
 
-    val graphUIDataForEventRateOfAllStreams =
+    val graphUIDataForRecordRateOfAllStreams =
       new GraphUIData(
-        "all-stream-events-timeline",
-        "all-stream-events-histogram",
-        eventRateForAllStreams.data,
+        "all-stream-records-timeline",
+        "all-stream-records-histogram",
+        recordRateForAllStreams.data,
         minBatchTime,
         maxBatchTime,
-        minEventRate,
-        maxEventRate,
-        "events/sec")
-    graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector)
+        minRecordRate,
+        maxRecordRate,
+        "records/sec")
+    graphUIDataForRecordRateOfAllStreams.generateDataJs(jsCollector)
 
     val graphUIDataForSchedulingDelay =
       new GraphUIData(
@@ -334,16 +334,16 @@ private[ui] class StreamingPage(parent: StreamingTab)
                   <div>Receivers: {listener.numActiveReceivers} / {numReceivers} active</div>
                 }
               }
-              <div>Avg: {eventRateForAllStreams.formattedAvg} events/sec</div>
+              <div>Avg: {recordRateForAllStreams.formattedAvg} records/sec</div>
             </div>
           </td>
-          <td class="timeline">{graphUIDataForEventRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
-          <td class="histogram">{graphUIDataForEventRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
+          <td class="timeline">{graphUIDataForRecordRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
+          <td class="histogram">{graphUIDataForRecordRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
         </tr>
       {if (hasStream) {
         <tr id="inputs-table" style="display: none;" >
           <td colspan="3">
-            {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)}
+            {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minRecordRate, maxRecordRate)}
           </td>
         </tr>
       }}
@@ -390,15 +390,16 @@ private[ui] class StreamingPage(parent: StreamingTab)
       maxX: Long,
       minY: Double,
       maxY: Double): Seq[Node] = {
-    val maxYCalculated = listener.receivedEventRateWithBatchTime.values
-      .flatMap { case streamAndRates => streamAndRates.map { case (_, eventRate) => eventRate } }
+    val maxYCalculated = listener.receivedRecordRateWithBatchTime.values
+      .flatMap { case streamAndRates => streamAndRates.map { case (_, recordRate) => recordRate } }
       .reduceOption[Double](math.max)
       .map(_.ceil.toLong)
       .getOrElse(0L)
 
-    val content = listener.receivedEventRateWithBatchTime.toList.sortBy(_._1).map {
-      case (streamId, eventRates) =>
-        generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxYCalculated)
+    val content = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map {
+      case (streamId, recordRates) =>
+        generateInputDStreamRow(
+          jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated)
     }.foldLeft[Seq[Node]](Nil)(_ ++ _)
 
     // scalastyle:off
@@ -422,7 +423,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
   private def generateInputDStreamRow(
       jsCollector: JsCollector,
       streamId: Int,
-      eventRates: Seq[(Long, Double)],
+      recordRates: Seq[(Long, Double)],
       minX: Long,
       maxX: Long,
       minY: Double,
@@ -447,25 +448,25 @@ private[ui] class StreamingPage(parent: StreamingTab)
     val receiverLastErrorTime = receiverInfo.map {
       r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime)
     }.getOrElse(emptyCell)
-    val receivedRecords = new EventRateUIData(eventRates)
+    val receivedRecords = new RecordRateUIData(recordRates)
 
-    val graphUIDataForEventRate =
+    val graphUIDataForRecordRate =
       new GraphUIData(
-        s"stream-$streamId-events-timeline",
-        s"stream-$streamId-events-histogram",
+        s"stream-$streamId-records-timeline",
+        s"stream-$streamId-records-histogram",
         receivedRecords.data,
         minX,
         maxX,
         minY,
         maxY,
-        "events/sec")
-    graphUIDataForEventRate.generateDataJs(jsCollector)
+        "records/sec")
+    graphUIDataForRecordRate.generateDataJs(jsCollector)
 
     <tr>
       <td rowspan="2" style="vertical-align: middle; width: 151px;">
         <div style="width: 151px;">
           <div style="word-wrap: break-word;"><strong>{receiverName}</strong></div>
-          <div>Avg: {receivedRecords.formattedAvg} events/sec</div>
+          <div>Avg: {receivedRecords.formattedAvg} records/sec</div>
         </div>
       </td>
       <td>{receiverActive}</td>
@@ -475,9 +476,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
     </tr>
     <tr>
       <td colspan="3" class="timeline">
-        {graphUIDataForEventRate.generateTimelineHtml(jsCollector)}
+        {graphUIDataForRecordRate.generateTimelineHtml(jsCollector)}
       </td>
-      <td class="histogram">{graphUIDataForEventRate.generateHistogramHtml(jsCollector)}</td>
+      <td class="histogram">{graphUIDataForRecordRate.generateHistogramHtml(jsCollector)}</td>
     </tr>
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org