You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/01/16 19:06:07 UTC

spark git commit: [SPARK-5231][WebUI] History Server shows wrong job submission time.

Repository: spark
Updated Branches:
  refs/heads/master f6b852aad -> e8422c521


[SPARK-5231][WebUI] History Server shows wrong job submission time.

History Server doesn't show collect job submission time.
It's because `JobProgressListener` updates job submission time every time `onJobStart` method is invoked from `ReplayListenerBus`.

Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>

Closes #4029 from sarutak/SPARK-5231 and squashes the following commits:

0af9e22 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5231
da8bd14 [Kousuke Saruta] Made submissionTime in SparkListenerJobStartas and completionTime in SparkListenerJobEnd as regular Long
0412a6a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5231
26b9b99 [Kousuke Saruta] Fixed the test cases
2d47bd3 [Kousuke Saruta] Fixed to record job submission time and completion time collectly


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8422c52
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8422c52
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8422c52

Branch: refs/heads/master
Commit: e8422c521bc76bc4b03c337605f136403ea9f64a
Parents: f6b852a
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Authored: Fri Jan 16 10:05:11 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Jan 16 10:05:11 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 16 ++++++----
 .../apache/spark/scheduler/SparkListener.scala  |  7 ++++-
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  | 15 +++++-----
 .../spark/ui/jobs/JobProgressListener.scala     | 10 +++----
 .../scala/org/apache/spark/ui/jobs/UIData.scala |  6 ++--
 .../org/apache/spark/util/JsonProtocol.scala    | 11 +++++--
 .../spark/scheduler/SparkListenerSuite.scala    | 10 ++++---
 .../ui/jobs/JobProgressListenerSuite.scala      |  6 ++--
 .../apache/spark/util/JsonProtocolSuite.scala   | 31 +++++++++++++++++---
 9 files changed, 77 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e8422c52/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8cb1591..3bca59e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -661,7 +661,7 @@ class DAGScheduler(
       // completion events or stage abort
       stageIdToStage -= s.id
       jobIdToStageIds -= job.jobId
-      listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult))
+      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
     }
   }
 
@@ -710,7 +710,7 @@ class DAGScheduler(
         stage.latestInfo.stageFailed(stageFailedMessage)
         listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
       }
-      listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
     }
   }
 
@@ -749,9 +749,11 @@ class DAGScheduler(
       logInfo("Missing parents: " + getMissingParentStages(finalStage))
       val shouldRunLocally =
         localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
+      val jobSubmissionTime = clock.getTime()
       if (shouldRunLocally) {
         // Compute very short actions like first() or take() with no parent stages locally.
-        listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties))
+        listenerBus.post(
+          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
         runLocally(job)
       } else {
         jobIdToActiveJob(jobId) = job
@@ -759,7 +761,8 @@ class DAGScheduler(
         finalStage.resultOfJob = Some(job)
         val stageIds = jobIdToStageIds(jobId).toArray
         val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
-        listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
+        listenerBus.post(
+          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
         submitStage(finalStage)
       }
     }
@@ -965,7 +968,8 @@ class DAGScheduler(
                   if (job.numFinished == job.numPartitions) {
                     markStageAsFinished(stage)
                     cleanupStateForJobAndIndependentStages(job)
-                    listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
+                    listenerBus.post(
+                      SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
                   }
 
                   // taskSucceeded runs some user code that might throw an exception. Make sure
@@ -1234,7 +1238,7 @@ class DAGScheduler(
     if (ableToCancelStages) {
       job.listener.jobFailed(error)
       cleanupStateForJobAndIndependentStages(job)
-      listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e8422c52/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 4840d8b..e5d1eb7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -59,6 +59,7 @@ case class SparkListenerTaskEnd(
 @DeveloperApi
 case class SparkListenerJobStart(
     jobId: Int,
+    time: Long,
     stageInfos: Seq[StageInfo],
     properties: Properties = null)
   extends SparkListenerEvent {
@@ -68,7 +69,11 @@ case class SparkListenerJobStart(
 }
 
 @DeveloperApi
-case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent
+case class SparkListenerJobEnd(
+    jobId: Int,
+    time: Long,
+    jobResult: JobResult)
+  extends SparkListenerEvent
 
 @DeveloperApi
 case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])

http://git-wip-us.apache.org/repos/asf/spark/blob/e8422c52/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index 1d1c701..8121270 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -21,7 +21,6 @@ import scala.xml.{Node, NodeSeq}
 
 import javax.servlet.http.HttpServletRequest
 
-import org.apache.spark.JobExecutionStatus
 import org.apache.spark.ui.{WebUIPage, UIUtils}
 import org.apache.spark.ui.jobs.UIData.JobUIData
 
@@ -51,13 +50,13 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
       val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
       val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
       val duration: Option[Long] = {
-        job.startTime.map { start =>
-          val end = job.endTime.getOrElse(System.currentTimeMillis())
+        job.submissionTime.map { start =>
+          val end = job.completionTime.getOrElse(System.currentTimeMillis())
           end - start
         }
       }
       val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
-      val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")
+      val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
       val detailUrl =
         "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
       <tr>
@@ -68,7 +67,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
           <div><em>{lastStageDescription}</em></div>
           <a href={detailUrl}>{lastStageName}</a>
         </td>
-        <td sorttable_customkey={job.startTime.getOrElse(-1).toString}>
+        <td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
           {formattedSubmissionTime}
         </td>
         <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
@@ -101,11 +100,11 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
       val now = System.currentTimeMillis
 
       val activeJobsTable =
-        jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)
+        jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
       val completedJobsTable =
-        jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
+        jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
       val failedJobsTable =
-        jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
+        jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
 
       val shouldShowActiveJobs = activeJobs.nonEmpty
       val shouldShowCompletedJobs = completedJobs.nonEmpty

http://git-wip-us.apache.org/repos/asf/spark/blob/e8422c52/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 72935be..b0d3bed 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -153,14 +153,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
     val jobData: JobUIData =
       new JobUIData(
         jobId = jobStart.jobId,
-        startTime = Some(System.currentTimeMillis),
-        endTime = None,
+        submissionTime = Option(jobStart.time).filter(_ >= 0),
         stageIds = jobStart.stageIds,
         jobGroup = jobGroup,
         status = JobExecutionStatus.RUNNING)
     // Compute (a potential underestimate of) the number of tasks that will be run by this job.
     // This may be an underestimate because the job start event references all of the result
-    // stages's transitive stage dependencies, but some of these stages might be skipped if their
+    // stages' transitive stage dependencies, but some of these stages might be skipped if their
     // output is available from earlier runs.
     // See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
     jobData.numTasks = {
@@ -186,7 +185,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
       new JobUIData(jobId = jobEnd.jobId)
     }
-    jobData.endTime = Some(System.currentTimeMillis())
+    jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
+
     jobEnd.jobResult match {
       case JobSucceeded =>
         completedJobs += jobData
@@ -309,7 +309,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
     val info = taskEnd.taskInfo
     // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
-    // compeletion event is for. Let's just drop it here. This means we might have some speculation
+    // completion event is for. Let's just drop it here. This means we might have some speculation
     // tasks on the web ui that's never marked as complete.
     if (info != null && taskEnd.stageAttemptId != -1) {
       val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {

http://git-wip-us.apache.org/repos/asf/spark/blob/e8422c52/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 48fd7ca..01f7e23 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -40,15 +40,15 @@ private[jobs] object UIData {
 
   class JobUIData(
     var jobId: Int = -1,
-    var startTime: Option[Long] = None,
-    var endTime: Option[Long] = None,
+    var submissionTime: Option[Long] = None,
+    var completionTime: Option[Long] = None,
     var stageIds: Seq[Int] = Seq.empty,
     var jobGroup: Option[String] = None,
     var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
     /* Tasks */
     // `numTasks` is a potential underestimate of the true number of tasks that this job will run.
     // This may be an underestimate because the job start event references all of the result
-    // stages's transitive stage dependencies, but some of these stages might be skipped if their
+    // stages' transitive stage dependencies, but some of these stages might be skipped if their
     // output is available from earlier runs.
     // See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
     var numTasks: Int = 0,

http://git-wip-us.apache.org/repos/asf/spark/blob/e8422c52/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ee3756c..76709a2 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -32,6 +32,7 @@ import org.apache.spark.executor._
 import org.apache.spark.scheduler._
 import org.apache.spark.storage._
 import org.apache.spark._
+import org.apache.hadoop.hdfs.web.JsonUtil
 
 /**
  * Serializes SparkListener events to/from JSON.  This protocol provides strong backwards-
@@ -141,6 +142,7 @@ private[spark] object JsonProtocol {
     val properties = propertiesToJson(jobStart.properties)
     ("Event" -> Utils.getFormattedClassName(jobStart)) ~
     ("Job ID" -> jobStart.jobId) ~
+    ("Submission Time" -> jobStart.time) ~
     ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~  // Added in Spark 1.2.0
     ("Stage IDs" -> jobStart.stageIds) ~
     ("Properties" -> properties)
@@ -150,6 +152,7 @@ private[spark] object JsonProtocol {
     val jobResult = jobResultToJson(jobEnd.jobResult)
     ("Event" -> Utils.getFormattedClassName(jobEnd)) ~
     ("Job ID" -> jobEnd.jobId) ~
+    ("Completion Time" -> jobEnd.time) ~
     ("Job Result" -> jobResult)
   }
 
@@ -492,6 +495,8 @@ private[spark] object JsonProtocol {
 
   def jobStartFromJson(json: JValue): SparkListenerJobStart = {
     val jobId = (json \ "Job ID").extract[Int]
+    val submissionTime =
+      Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
     val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
     val properties = propertiesFromJson(json \ "Properties")
     // The "Stage Infos" field was added in Spark 1.2.0
@@ -499,13 +504,15 @@ private[spark] object JsonProtocol {
       .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
         stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
       }
-    SparkListenerJobStart(jobId, stageInfos, properties)
+    SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
   }
 
   def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
     val jobId = (json \ "Job ID").extract[Int]
+    val completionTime =
+      Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
     val jobResult = jobResultFromJson(json \ "Job Result")
-    SparkListenerJobEnd(jobId, jobResult)
+    SparkListenerJobEnd(jobId, completionTime, jobResult)
   }
 
   def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {

http://git-wip-us.apache.org/repos/asf/spark/blob/e8422c52/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 24f41bf..0fb1bdd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -34,6 +34,8 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with Matchers
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
 
+  val jobCompletionTime = 1421191296660L
+
   before {
     sc = new SparkContext("local", "SparkListenerSuite")
   }
@@ -44,7 +46,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with Matchers
     bus.addListener(counter)
 
     // Listener bus hasn't started yet, so posting events should not increment counter
-    (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
+    (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
     assert(counter.count === 0)
 
     // Starting listener bus should flush all buffered events
@@ -54,7 +56,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with Matchers
 
     // After listener bus has stopped, posting events should not increment counter
     bus.stop()
-    (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
+    (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
     assert(counter.count === 5)
 
     // Listener bus must not be started twice
@@ -99,7 +101,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with Matchers
 
     bus.addListener(blockingListener)
     bus.start()
-    bus.post(SparkListenerJobEnd(0, JobSucceeded))
+    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
 
     listenerStarted.acquire()
     // Listener should be blocked after start
@@ -345,7 +347,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with Matchers
     bus.start()
 
     // Post events to all listeners, and wait until the queue is drained
-    (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
+    (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
     assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
 
     // The exception should be caught, and the event should be propagated to other listeners

http://git-wip-us.apache.org/repos/asf/spark/blob/e8422c52/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index f865d8c..c9417ea 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -28,6 +28,8 @@ import org.apache.spark.util.Utils
 
 class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers {
 
+  val jobSubmissionTime = 1421191042750L
+  val jobCompletionTime = 1421191296660L
 
   private def createStageStartEvent(stageId: Int) = {
     val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
@@ -46,12 +48,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     val stageInfos = stageIds.map { stageId =>
       new StageInfo(stageId, 0, stageId.toString, 0, null, "")
     }
-    SparkListenerJobStart(jobId, stageInfos)
+    SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
   }
 
   private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
     val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded
-    SparkListenerJobEnd(jobId, result)
+    SparkListenerJobEnd(jobId, jobCompletionTime, result)
   }
 
   private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) {

http://git-wip-us.apache.org/repos/asf/spark/blob/e8422c52/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 71dfed1..db400b4 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -34,6 +34,9 @@ import org.apache.spark.storage._
 
 class JsonProtocolSuite extends FunSuite {
 
+  val jobSubmissionTime = 1421191042750L
+  val jobCompletionTime = 1421191296660L
+
   test("SparkListenerEvent") {
     val stageSubmitted =
       SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
@@ -54,9 +57,9 @@ class JsonProtocolSuite extends FunSuite {
       val stageIds = Seq[Int](1, 2, 3, 4)
       val stageInfos = stageIds.map(x =>
         makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
-      SparkListenerJobStart(10, stageInfos, properties)
+      SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
     }
-    val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
+    val jobEnd = SparkListenerJobEnd(20, jobCompletionTime, JobSucceeded)
     val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
       "JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")),
       "Spark Properties" -> Seq(("Job throughput", "80000 jobs/s, regardless of job type")),
@@ -247,13 +250,31 @@ class JsonProtocolSuite extends FunSuite {
     val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
     val dummyStageInfos =
       stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
-    val jobStart = SparkListenerJobStart(10, stageInfos, properties)
+    val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
     val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
     val expectedJobStart =
-      SparkListenerJobStart(10, dummyStageInfos, properties)
+      SparkListenerJobStart(10, jobSubmissionTime, dummyStageInfos, properties)
     assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
   }
 
+  test("SparkListenerJobStart and SparkListenerJobEnd backward compatibility") {
+    // Prior to Spark 1.3.0, SparkListenerJobStart did not have a "Submission Time" property.
+    // Also, SparkListenerJobEnd did not have a "Completion Time" property.
+    val stageIds = Seq[Int](1, 2, 3, 4)
+    val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40, x * 50))
+    val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, properties)
+    val oldStartEvent = JsonProtocol.jobStartToJson(jobStart)
+      .removeField({ _._1 == "Submission Time"})
+    val expectedJobStart = SparkListenerJobStart(11, -1, stageInfos, properties)
+    assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldStartEvent))
+
+    val jobEnd = SparkListenerJobEnd(11, jobCompletionTime, JobSucceeded)
+    val oldEndEvent = JsonProtocol.jobEndToJson(jobEnd)
+      .removeField({ _._1 == "Completion Time"})
+    val expectedJobEnd = SparkListenerJobEnd(11, -1, JobSucceeded)
+    assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
+  }
+
   /** -------------------------- *
    | Helper test running methods |
    * --------------------------- */
@@ -1075,6 +1096,7 @@ class JsonProtocolSuite extends FunSuite {
       |{
       |  "Event": "SparkListenerJobStart",
       |  "Job ID": 10,
+      |  "Submission Time": 1421191042750,
       |  "Stage Infos": [
       |    {
       |      "Stage ID": 1,
@@ -1349,6 +1371,7 @@ class JsonProtocolSuite extends FunSuite {
       |{
       |  "Event": "SparkListenerJobEnd",
       |  "Job ID": 20,
+      |  "Completion Time": 1421191296660,
       |  "Job Result": {
       |    "Result": "JobSucceeded"
       |  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org