You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/05 03:21:34 UTC
[1/4] git commit: add javadoc to JobLogger, and some small format fix
Updated Branches:
refs/heads/branch-0.8 7e00dee27 -> 518cf22eb
add javadoc to JobLogger, and some small format fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/079820f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/079820f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/079820f7
Branch: refs/heads/branch-0.8
Commit: 079820f77e0bca1e5de8c0e9641f5fd8d815c7c9
Parents: 3db505c
Author: Mingfei <mi...@intel.com>
Authored: Fri Nov 1 15:32:27 2013 +0800
Committer: Mingfei <mi...@intel.com>
Committed: Fri Nov 1 15:32:27 2013 +0800
----------------------------------------------------------------------
.../org/apache/spark/scheduler/JobLogger.scala | 161 ++++++++++++++++---
.../apache/spark/scheduler/JobLoggerSuite.scala | 2 +-
2 files changed, 136 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/079820f7/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 12b0d74..0ae6a50 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -24,16 +24,19 @@ import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import java.util.concurrent.LinkedBlockingQueue
-import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage.StorageLevel
/**
- * A logger class to record runtime information for jobs in Spark. This class outputs one log file
- * per Spark job with information such as RDD graph, tasks start/stop, shuffle information.
- *
+ * <p>A logger class to record runtime information for jobs in Spark. This class outputs one log file
+ * for each Spark job, containing RDD graph, tasks start/stop, shuffle information. <br>
+ * JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext
+ * after the SparkContext is created. <br>
+ * Note that each JobLogger only works for one SparkContext
* @param logDirName The base directory for the log files.
*/
class JobLogger(val logDirName: String) extends SparkListener with Logging {
@@ -56,7 +59,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
private[scheduler] def getJobIDToStages = jobIDToStages
private[scheduler] def getEventQueue = eventQueue
- // Create a folder for log files, the folder's name is the creation time of the jobLogger
+ /** Create a folder for log files, the folder's name is the creation time of jobLogger */
protected def createLogDir() {
val dir = new File(logDir + "/" + logDirName + "/")
if (!dir.exists() && !dir.mkdirs()) {
@@ -64,7 +67,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
- // Create a log file for one job, the file name is the jobID
+ /**
+ * Create a log file for one job
+ * @param jobID ID of the job
+ * @return No return
+ * @exception FileNotFoundException Fail to create log file
+ */
protected def createLogWriter(jobID: Int) {
try {
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
@@ -74,8 +82,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
- // Close log file, and clean the stage relationship in stageIDToJobID
- protected def closeLogWriter(jobID: Int) =
+ /**
+ * Close log file, and clean the stage relationship in stageIDToJobID
+ * @param jobID ID of the job
+ * @return No return
+ */
+ protected def closeLogWriter(jobID: Int) {
jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
fileWriter.close()
jobIDToStages.get(jobID).foreach(_.foreach{ stage =>
@@ -84,9 +96,15 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobIDToPrintWriter -= jobID
jobIDToStages -= jobID
}
+ }
- // Write log information to log file, withTime parameter controls whether to recored
- // time stamp for the information
+ /**
+ * Write info into log file
+ * @param jobID ID of the job
+ * @param info Info to be recorded
+ * @param withTime Controls whether to record time stamp before the info, default is true
+ * @return No return
+ */
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
var writeInfo = info
if (withTime) {
@@ -96,9 +114,23 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
}
- protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) =
+ /**
+ * Write info into log file
+ * @param stageID ID of the stage
+ * @param info Info to be recorded
+ * @param withTime Controls whether to record time stamp before the info, default is true
+ * @return No return
+ */
+ protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
-
+ }
+
+ /**
+ * Build stage dependency for a job
+ * @param jobID ID of the job
+ * @param stage Root stage of the job
+ * @return No return
+ */
protected def buildJobDep(jobID: Int, stage: Stage) {
if (stage.jobId == jobID) {
jobIDToStages.get(jobID) match {
@@ -112,6 +144,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
+ /**
+ * Record stage dependency and RDD dependency for a stage
+ * @param jobID Job ID of the stage
+ * @return No return
+ */
protected def recordStageDep(jobID: Int) {
def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
var rddList = new ListBuffer[RDD[_]]
@@ -138,8 +175,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
- // Generate indents and convert to String
- protected def indentString(indent: Int) = {
+ /**
+ * Generate indents and convert to String
+ * @param indent Number of indents
+ * @return string of indents
+ */
+ protected def indentString(indent: Int): String = {
val sb = new StringBuilder()
for (i <- 1 to indent) {
sb.append(" ")
@@ -147,16 +188,35 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
sb.toString()
}
- protected def getRddName(rdd: RDD[_]) = {
- var rddName = rdd.getClass.getName
+ /**
+ * Get RDD's name
+ * @param rdd Input RDD
+ * @return String of RDD's name
+ */
+ protected def getRddName(rdd: RDD[_]): String = {
+ var rddName = rdd.getClass.getSimpleName
if (rdd.name != null) {
rddName = rdd.name
}
rddName
}
+ /**
+ * Record RDD dependency graph in a stage
+ * @param jobID Job ID of the stage
+ * @param rdd Root RDD of the stage
+ * @param indent Indent number before info
+ * @return No return
+ */
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
- val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
+ val rddInfo =
+ if (rdd.getStorageLevel != StorageLevel.NONE) {
+ "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
+ rdd.origin + " " + rdd.generator
+ } else {
+ "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
+ rdd.origin + " " + rdd.generator
+ }
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
rdd.dependencies.foreach {
case shufDep: ShuffleDependency[_, _] =>
@@ -166,7 +226,15 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
- protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
+ /**
+ * Record stage dependency graph of a job
+ * @param jobID Job ID of the stage
+ * @param stage Root stage of the job
+ * @param indent Indent number before info, default is 0
+ * @return No return
+ */
+
+ protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) {
val stageInfo = if (stage.isShuffleMap) {
"STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
} else {
@@ -174,14 +242,24 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
if (stage.jobId == jobID) {
jobLogInfo(jobID, indentString(indent) + stageInfo, false)
- recordRddInStageGraph(jobID, stage.rdd, indent)
- stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
+ if (!idSet.contains(stage.id)) {
+ idSet += stage.id
+ recordRddInStageGraph(jobID, stage.rdd, indent)
+ stage.parents.foreach(recordStageDepGraph(jobID, _, idSet, indent + 2))
+ }
} else {
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
}
}
- // Record task metrics into job log files
+ /**
+ * Record task metrics into job log files, including execution info and shuffle metrics
+ * @param stageID Stage ID of the task
+ * @param status Status info of the task
+ * @param taskInfo Task description info
+ * @param taskMetrics Task running metrics
+ * @return No return
+ */
protected def recordTaskMetrics(stageID: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
@@ -206,18 +284,33 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
}
+ /**
+ * When stage is submitted, record stage submit info
+ * @param stageSubmitted Stage submitted event
+ * @return No return
+ */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
stageSubmitted.stage.stageId, stageSubmitted.stage.numTasks))
}
+ /**
+ * When stage is completed, record stage completion status
+ * @param stageCompleted Stage completed event
+ * @return No return
+ */
override def onStageCompleted(stageCompleted: StageCompleted) {
stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
stageCompleted.stage.stageId))
}
-
+
override def onTaskStart(taskStart: SparkListenerTaskStart) { }
-
+
+ /**
+ * When task ends, record task completion status and metrics
+ * @param taskEnd Task end event
+ * @return No return
+ */
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val task = taskEnd.task
val taskInfo = taskEnd.taskInfo
@@ -246,6 +339,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
+ /**
+ * When job ends, recording job completion status and close log file
+ * @param jobEnd Job end event
+ * @return No return
+ */
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val job = jobEnd.job
var info = "JOB_ID=" + job.jobId
@@ -259,14 +357,25 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(job.jobId)
}
-
+
+ /**
+ * Record job properties into job log file
+ * @param jobID ID of the job
+ * @param properties Properties of the job
+ * @return No return
+ */
protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
jobLogInfo(jobID, description, false)
}
}
-
+
+ /**
+ * When job starts, record job property and stage graph
+ * @param jobStart Job start event
+ * @return No return
+ */
override def onJobStart(jobStart: SparkListenerJobStart) {
val job = jobStart.job
val properties = jobStart.properties
@@ -274,7 +383,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
recordJobProperties(job.jobId, properties)
buildJobDep(job.jobId, job.finalStage)
recordStageDep(job.jobId)
- recordStageDepGraph(job.jobId, job.finalStage)
+ recordStageDepGraph(job.jobId, job.finalStage, new HashSet[Int])
jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/079820f7/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 8406093..7d7ca9b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -65,7 +65,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val rootStageInfo = new StageInfo(rootStage)
joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStageInfo, null))
- joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
+ joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getSimpleName)
parentRdd.setName("MyRDD")
joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
joblogger.createLogWriterTest(jobID)
[3/4] git commit: delete @return if method has no return value
Posted by rx...@apache.org.
delete @return if method has no return value
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7374376f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7374376f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7374376f
Branch: refs/heads/branch-0.8
Commit: 7374376f55015c6ba5d4b7fd551aebcb83b1d041
Parents: 4ca72c8
Author: Mingfei <mi...@intel.com>
Authored: Tue Nov 5 08:44:32 2013 +0800
Committer: Mingfei <mi...@intel.com>
Committed: Tue Nov 5 08:44:32 2013 +0800
----------------------------------------------------------------------
.../org/apache/spark/scheduler/JobLogger.scala | 16 ----------------
1 file changed, 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7374376f/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index a1d5e6c..94f8b01 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -70,7 +70,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
/**
* Create a log file for one job
* @param jobID ID of the job
- * @return No return
* @exception FileNotFoundException Fail to create log file
*/
protected def createLogWriter(jobID: Int) {
@@ -85,7 +84,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
/**
* Close log file, and clean the stage relationship in stageIDToJobID
* @param jobID ID of the job
- * @return No return
*/
protected def closeLogWriter(jobID: Int) {
jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
@@ -103,7 +101,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
* @param jobID ID of the job
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
- * @return No return
*/
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
var writeInfo = info
@@ -119,7 +116,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
* @param stageID ID of the stage
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
- * @return No return
*/
protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
@@ -129,7 +125,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
* Build stage dependency for a job
* @param jobID ID of the job
* @param stage Root stage of the job
- * @return No return
*/
protected def buildJobDep(jobID: Int, stage: Stage) {
if (stage.jobId == jobID) {
@@ -147,7 +142,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
/**
* Record stage dependency and RDD dependency for a stage
* @param jobID Job ID of the stage
- * @return No return
*/
protected def recordStageDep(jobID: Int) {
def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
@@ -206,7 +200,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
* @param jobID Job ID of the stage
* @param rdd Root RDD of the stage
* @param indent Indent number before info
- * @return No return
*/
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val rddInfo =
@@ -231,9 +224,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
* @param jobID Job ID of the stage
* @param stage Root stage of the job
* @param indent Indent number before info, default is 0
- * @return No return
*/
-
protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) {
val stageInfo = if (stage.isShuffleMap) {
"STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
@@ -258,7 +249,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
* @param status Status info of the task
* @param taskInfo Task description info
* @param taskMetrics Task running metrics
- * @return No return
*/
protected def recordTaskMetrics(stageID: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
@@ -287,7 +277,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
/**
* When stage is submitted, record stage submit info
* @param stageSubmitted Stage submitted event
- * @return No return
*/
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
@@ -297,7 +286,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
/**
* When stage is completed, record stage completion status
* @param stageCompleted Stage completed event
- * @return No return
*/
override def onStageCompleted(stageCompleted: StageCompleted) {
stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
@@ -309,7 +297,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
/**
* When task ends, record task completion status and metrics
* @param taskEnd Task end event
- * @return No return
*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val task = taskEnd.task
@@ -342,7 +329,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
/**
* When job ends, recording job completion status and close log file
* @param jobEnd Job end event
- * @return No return
*/
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val job = jobEnd.job
@@ -362,7 +348,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
* Record job properties into job log file
* @param jobID ID of the job
* @param properties Properties of the job
- * @return No return
*/
protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
@@ -374,7 +359,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
/**
* When job starts, record job property and stage graph
* @param jobStart Job start event
- * @return No return
*/
override def onJobStart(jobStart: SparkListenerJobStart) {
val job = jobStart.job
[2/4] git commit: modify according to comments
Posted by rx...@apache.org.
modify according to comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4ca72c84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4ca72c84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4ca72c84
Branch: refs/heads/branch-0.8
Commit: 4ca72c849519395461d581bb34b61b4a078738f8
Parents: 079820f
Author: Mingfei <mi...@intel.com>
Authored: Mon Nov 4 15:19:04 2013 +0800
Committer: Mingfei <mi...@intel.com>
Committed: Mon Nov 4 15:19:04 2013 +0800
----------------------------------------------------------------------
.../org/apache/spark/scheduler/JobLogger.scala | 136 +++++++++----------
1 file changed, 68 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4ca72c84/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 0ae6a50..a1d5e6c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -32,10 +32,10 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.StorageLevel
/**
- * <p>A logger class to record runtime information for jobs in Spark. This class outputs one log file
- * for each Spark job, containing RDD graph, tasks start/stop, shuffle information. <br>
+ * A logger class to record runtime information for jobs in Spark. This class outputs one log file
+ * for each Spark job, containing RDD graph, tasks start/stop, shuffle information.
* JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext
- * after the SparkContext is created. <br>
+ * after the SparkContext is created.
* Note that each JobLogger only works for one SparkContext
* @param logDirName The base directory for the log files.
*/
@@ -68,10 +68,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Create a log file for one job
- * @param jobID ID of the job
- * @return No return
- * @exception FileNotFoundException Fail to create log file
+ * Create a log file for one job
+ * @param jobID ID of the job
+ * @return No return
+ * @exception FileNotFoundException Fail to create log file
*/
protected def createLogWriter(jobID: Int) {
try {
@@ -83,9 +83,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Close log file, and clean the stage relationship in stageIDToJobID
- * @param jobID ID of the job
- * @return No return
+ * Close log file, and clean the stage relationship in stageIDToJobID
+ * @param jobID ID of the job
+ * @return No return
*/
protected def closeLogWriter(jobID: Int) {
jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
@@ -99,11 +99,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Write info into log file
- * @param jobID ID of the job
- * @param info Info to be recorded
- * @param withTime Controls whether to record time stamp before the info, default is true
- * @return No return
+ * Write info into log file
+ * @param jobID ID of the job
+ * @param info Info to be recorded
+ * @param withTime Controls whether to record time stamp before the info, default is true
+ * @return No return
*/
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
var writeInfo = info
@@ -115,21 +115,21 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Write info into log file
- * @param stageID ID of the stage
- * @param info Info to be recorded
- * @param withTime Controls whether to record time stamp before the info, default is true
- * @return No return
+ * Write info into log file
+ * @param stageID ID of the stage
+ * @param info Info to be recorded
+ * @param withTime Controls whether to record time stamp before the info, default is true
+ * @return No return
*/
protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
}
/**
- * Build stage dependency for a job
- * @param jobID ID of the job
- * @param stage Root stage of the job
- * @return No return
+ * Build stage dependency for a job
+ * @param jobID ID of the job
+ * @param stage Root stage of the job
+ * @return No return
*/
protected def buildJobDep(jobID: Int, stage: Stage) {
if (stage.jobId == jobID) {
@@ -145,9 +145,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Record stage dependency and RDD dependency for a stage
- * @param jobID Job ID of the stage
- * @return No return
+ * Record stage dependency and RDD dependency for a stage
+ * @param jobID Job ID of the stage
+ * @return No return
*/
protected def recordStageDep(jobID: Int) {
def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
@@ -176,9 +176,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Generate indents and convert to String
- * @param indent Number of indents
- * @return string of indents
+ * Generate indents and convert to String
+ * @param indent Number of indents
+ * @return string of indents
*/
protected def indentString(indent: Int): String = {
val sb = new StringBuilder()
@@ -189,9 +189,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Get RDD's name
- * @param rdd Input RDD
- * @return String of RDD's name
+ * Get RDD's name
+ * @param rdd Input RDD
+ * @return String of RDD's name
*/
protected def getRddName(rdd: RDD[_]): String = {
var rddName = rdd.getClass.getSimpleName
@@ -202,11 +202,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Record RDD dependency graph in a stage
- * @param jobID Job ID of the stage
- * @param rdd Root RDD of the stage
- * @param indent Indent number before info
- * @return No return
+ * Record RDD dependency graph in a stage
+ * @param jobID Job ID of the stage
+ * @param rdd Root RDD of the stage
+ * @param indent Indent number before info
+ * @return No return
*/
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val rddInfo =
@@ -227,11 +227,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Record stage dependency graph of a job
- * @param jobID Job ID of the stage
- * @param stage Root stage of the job
- * @param indent Indent number before info, default is 0
- * @return No return
+ * Record stage dependency graph of a job
+ * @param jobID Job ID of the stage
+ * @param stage Root stage of the job
+ * @param indent Indent number before info, default is 0
+ * @return No return
*/
protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) {
@@ -253,12 +253,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Record task metrics into job log files, including execution info and shuffle metrics
- * @param stageID Stage ID of the task
- * @param status Status info of the task
- * @param taskInfo Task description info
- * @param taskMetrics Task running metrics
- * @return No return
+ * Record task metrics into job log files, including execution info and shuffle metrics
+ * @param stageID Stage ID of the task
+ * @param status Status info of the task
+ * @param taskInfo Task description info
+ * @param taskMetrics Task running metrics
+ * @return No return
*/
protected def recordTaskMetrics(stageID: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
@@ -285,9 +285,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * When stage is submitted, record stage submit info
- * @param stageSubmitted Stage submitted event
- * @return No return
+ * When stage is submitted, record stage submit info
+ * @param stageSubmitted Stage submitted event
+ * @return No return
*/
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
@@ -295,9 +295,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * When stage is completed, record stage completion status
- * @param stageCompleted Stage completed event
- * @return No return
+ * When stage is completed, record stage completion status
+ * @param stageCompleted Stage completed event
+ * @return No return
*/
override def onStageCompleted(stageCompleted: StageCompleted) {
stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
@@ -307,9 +307,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart) { }
/**
- * When task ends, record task completion status and metrics
- * @param taskEnd Task end event
- * @return No return
+ * When task ends, record task completion status and metrics
+ * @param taskEnd Task end event
+ * @return No return
*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val task = taskEnd.task
@@ -340,9 +340,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * When job ends, recording job completion status and close log file
- * @param jobEnd Job end event
- * @return No return
+ * When job ends, recording job completion status and close log file
+ * @param jobEnd Job end event
+ * @return No return
*/
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val job = jobEnd.job
@@ -359,10 +359,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * Record job properties into job log file
- * @param jobID ID of the job
- * @param properties Properties of the job
- * @return No return
+ * Record job properties into job log file
+ * @param jobID ID of the job
+ * @param properties Properties of the job
+ * @return No return
*/
protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
@@ -372,9 +372,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
/**
- * When job starts, record job property and stage graph
- * @param jobStart Job start event
- * @return No return
+ * When job starts, record job property and stage graph
+ * @param jobStart Job start event
+ * @return No return
*/
override def onJobStart(jobStart: SparkListenerJobStart) {
val job = jobStart.job
[4/4] git commit: Merge pull request #128 from
shimingfei/joblogger-doc
Posted by rx...@apache.org.
Merge pull request #128 from shimingfei/joblogger-doc
add javadoc to JobLogger, and some small fix
against Spark-941
add javadoc to JobLogger, output more info for RDD, modify recordStageDepGraph to avoid output duplicate stage dependency information
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/518cf22e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/518cf22e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/518cf22e
Branch: refs/heads/branch-0.8
Commit: 518cf22eb2436d019e4f7087a38080ad4a20df58
Parents: 7e00dee 7374376
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Nov 4 18:21:27 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Nov 4 18:21:27 2013 -0800
----------------------------------------------------------------------
.../org/apache/spark/scheduler/JobLogger.scala | 143 +++++++++++++++----
.../apache/spark/scheduler/JobLoggerSuite.scala | 2 +-
2 files changed, 119 insertions(+), 26 deletions(-)
----------------------------------------------------------------------