You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/01/18 18:55:33 UTC

spark git commit: [SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs

Repository: spark
Updated Branches:
  refs/heads/master 569e50680 -> a81e336f1


[SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs

## What changes were proposed in this pull request?

When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata).
It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation.

## How was this patch tested?
existing ut

cc zsxwing

Author: uncleGen <hu...@gmail.com>

Closes #16601 from uncleGen/SPARK-19182.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a81e336f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a81e336f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a81e336f

Branch: refs/heads/master
Commit: a81e336f1eddc2c6245d807aae2c81ddc60eabf9
Parents: 569e506
Author: uncleGen <hu...@gmail.com>
Authored: Wed Jan 18 10:55:31 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Jan 18 10:55:31 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/DStreamGraph.scala      | 13 +++++++++----
 .../streaming/ui/StreamingJobProgressListener.scala    |  8 ++++----
 2 files changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a81e336f/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 54d736e..dce2028 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   private val inputStreams = new ArrayBuffer[InputDStream[_]]()
   private val outputStreams = new ArrayBuffer[DStream[_]]()
 
+  @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil
+
   var rememberDuration: Duration = null
   var checkpointInProgress = false
 
   var zeroTime: Time = null
   var startTime: Time = null
   var batchDuration: Duration = null
+  @volatile private var numReceivers: Int = 0
 
   def start(time: Time) {
     this.synchronized {
@@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
       startTime = time
       outputStreams.foreach(_.initialize(zeroTime))
       outputStreams.foreach(_.remember(rememberDuration))
-      outputStreams.foreach(_.validateAtStart)
+      outputStreams.foreach(_.validateAtStart())
+      numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
+      inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
       inputStreams.par.foreach(_.start())
     }
   }
@@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
       .toArray
   }
 
-  def getInputStreamName(streamId: Int): Option[String] = synchronized {
-    inputStreams.find(_.id == streamId).map(_.name)
-  }
+  def getNumReceivers: Int = numReceivers
+
+  def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID
 
   def generateJobs(time: Time): Seq[Job] = {
     logDebug("Generating jobs for time " + time)

http://git-wip-us.apache.org/repos/asf/spark/blob/a81e336f/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 95f5821..ed4c1e4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def numInactiveReceivers: Int = {
-    ssc.graph.getReceiverInputStreams().length - numActiveReceivers
+    ssc.graph.getNumReceivers - numActiveReceivers
   }
 
   def numTotalCompletedBatches: Long = synchronized {
@@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
-    completedBatchUIData.toSeq
+    completedBatchUIData.toIndexedSeq
   }
 
   def streamName(streamId: Int): Option[String] = {
-    ssc.graph.getInputStreamName(streamId)
+    ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1)
   }
 
   /**
    * Return all InputDStream Ids
    */
-  def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
+  def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2)
 
   /**
    * Return all of the record rates for each InputDStream in each batch. The key of the return value


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org