You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/19 15:15:13 UTC

[spark] branch branch-3.1 updated: [SPARK-35136] Remove initial null value of LiveStage.info

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new a2b5fb3  [SPARK-35136] Remove initial null value of LiveStage.info
a2b5fb3 is described below

commit a2b5fb348c1dc31755701798bea9aa1cc70f9d3c
Author: Sander Goos <sa...@databricks.com>
AuthorDate: Mon Apr 19 15:09:29 2021 +0000

    [SPARK-35136] Remove initial null value of LiveStage.info
    
    ### What changes were proposed in this pull request?
    To prevent potential NullPointerExceptions, this PR changes the `LiveStage` constructor to take `info` as a constructor parameter and adds a nullcheck in  `AppStatusListener.activeStages`.
    
    ### Why are the changes needed?
    The `AppStatusListener.getOrCreateStage` would create a LiveStage object with the `info` field set to null and right after that set it to a specific StageInfo object. This can lead to a race condition when the `livestages` are read in between those calls. This could then lead to a null pointer exception in, for instance: `AppStatusListener.activeStages`.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Regular CI/CD tests
    
    Closes #32233 from sander-goos/SPARK-35136-livestage.
    
    Authored-by: Sander Goos <sa...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 core/src/main/scala/org/apache/spark/status/AppStatusListener.scala | 4 ++--
 core/src/main/scala/org/apache/spark/status/LiveEntity.scala        | 3 +--
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 52d41cd..4245243 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -1018,7 +1018,7 @@ private[spark] class AppStatusListener(
    */
   def activeStages(): Seq[v1.StageData] = {
     liveStages.values.asScala
-      .filter(_.info.submissionTime.isDefined)
+      .filter(s => Option(s.info).exists(_.submissionTime.isDefined))
       .map(_.toApi())
       .toList
       .sortBy(_.stageId)
@@ -1179,7 +1179,7 @@ private[spark] class AppStatusListener(
 
   private def getOrCreateStage(info: StageInfo): LiveStage = {
     val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
-      (_: (Int, Int)) => new LiveStage())
+      (_: (Int, Int)) => new LiveStage(info))
     stage.info = info
     stage
   }
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 38f1f25..d5cfdcb 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -393,14 +393,13 @@ private class LiveExecutorStageSummary(
 
 }
 
-private class LiveStage extends LiveEntity {
+private class LiveStage(var info: StageInfo) extends LiveEntity {
 
   import LiveEntityHelpers._
 
   var jobs = Seq[LiveJob]()
   var jobIds = Set[Int]()
 
-  var info: StageInfo = null
   var status = v1.StageStatus.PENDING
 
   var description: Option[String] = None

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org