You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/12/31 03:28:11 UTC

spark git commit: [SPARK-12561] Remove JobLogger in Spark 2.0.

Repository: spark
Updated Branches:
  refs/heads/master 9140d9074 -> be33a0cd3


[SPARK-12561] Remove JobLogger in Spark 2.0.

It was research code and has been deprecated since 1.0.0. No one really uses it since they can just use event logging.

Author: Reynold Xin <rx...@databricks.com>

Closes #10530 from rxin/SPARK-12561.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be33a0cd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be33a0cd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be33a0cd

Branch: refs/heads/master
Commit: be33a0cd3def86e0aa64dab411e504abbbdfb03c
Parents: 9140d90
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Dec 30 18:28:08 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Dec 30 18:28:08 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/JobLogger.scala  | 277 -------------------
 1 file changed, 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/be33a0cd/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
deleted file mode 100644
index f96eb8c..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import java.io.{File, FileNotFoundException, IOException, PrintWriter}
-import java.text.SimpleDateFormat
-import java.util.{Date, Properties}
-
-import scala.collection.mutable.HashMap
-
-import org.apache.spark._
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
-
-/**
- * :: DeveloperApi ::
- * A logger class to record runtime information for jobs in Spark. This class outputs one log file
- * for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass
- * of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext
- * is created. Note that each JobLogger only works for one SparkContext
- *
- * NOTE: The functionality of this class is heavily stripped down to accommodate for a general
- * refactor of the SparkListener interface. In its place, the EventLoggingListener is introduced
- * to log application information as SparkListenerEvents. To enable this functionality, set
- * spark.eventLog.enabled to true.
- */
-@DeveloperApi
-@deprecated("Log application information by setting spark.eventLog.enabled.", "1.0.0")
-class JobLogger(val user: String, val logDirName: String) extends SparkListener with Logging {
-
-  def this() = this(System.getProperty("user.name", "<unknown>"),
-    String.valueOf(System.currentTimeMillis()))
-
-  private val logDir =
-    if (System.getenv("SPARK_LOG_DIR") != null) {
-      System.getenv("SPARK_LOG_DIR")
-    } else {
-      "/tmp/spark-%s".format(user)
-    }
-
-  private val jobIdToPrintWriter = new HashMap[Int, PrintWriter]
-  private val stageIdToJobId = new HashMap[Int, Int]
-  private val jobIdToStageIds = new HashMap[Int, Seq[Int]]
-  private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
-    override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-  }
-
-  createLogDir()
-
-  /** Create a folder for log files, the folder's name is the creation time of jobLogger */
-  protected def createLogDir() {
-    val dir = new File(logDir + "/" + logDirName + "/")
-    if (dir.exists()) {
-      return
-    }
-    if (!dir.mkdirs()) {
-      // JobLogger should throw a exception rather than continue to construct this object.
-      throw new IOException("create log directory error:" + logDir + "/" + logDirName + "/")
-    }
-  }
-
-  /**
-   * Create a log file for one job
-   * @param jobId ID of the job
-   * @throws FileNotFoundException Fail to create log file
-   */
-  protected def createLogWriter(jobId: Int) {
-    try {
-      val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobId)
-      jobIdToPrintWriter += (jobId -> fileWriter)
-    } catch {
-      case e: FileNotFoundException => e.printStackTrace()
-    }
-  }
-
-  /**
-   * Close log file, and clean the stage relationship in stageIdToJobId
-   * @param jobId ID of the job
-   */
-  protected def closeLogWriter(jobId: Int) {
-    jobIdToPrintWriter.get(jobId).foreach { fileWriter =>
-      fileWriter.close()
-      jobIdToStageIds.get(jobId).foreach(_.foreach { stageId =>
-        stageIdToJobId -= stageId
-      })
-      jobIdToPrintWriter -= jobId
-      jobIdToStageIds -= jobId
-    }
-  }
-
-  /**
-   * Build up the maps that represent stage-job relationships
-   * @param jobId ID of the job
-   * @param stageIds IDs of the associated stages
-   */
-  protected def buildJobStageDependencies(jobId: Int, stageIds: Seq[Int]) = {
-    jobIdToStageIds(jobId) = stageIds
-    stageIds.foreach { stageId => stageIdToJobId(stageId) = jobId }
-  }
-
-  /**
-   * Write info into log file
-   * @param jobId ID of the job
-   * @param info Info to be recorded
-   * @param withTime Controls whether to record time stamp before the info, default is true
-   */
-  protected def jobLogInfo(jobId: Int, info: String, withTime: Boolean = true) {
-    var writeInfo = info
-    if (withTime) {
-      val date = new Date(System.currentTimeMillis())
-      writeInfo = dateFormat.get.format(date) + ": " + info
-    }
-    // scalastyle:off println
-    jobIdToPrintWriter.get(jobId).foreach(_.println(writeInfo))
-    // scalastyle:on println
-  }
-
-  /**
-   * Write info into log file
-   * @param stageId ID of the stage
-   * @param info Info to be recorded
-   * @param withTime Controls whether to record time stamp before the info, default is true
-   */
-  protected def stageLogInfo(stageId: Int, info: String, withTime: Boolean = true) {
-    stageIdToJobId.get(stageId).foreach(jobId => jobLogInfo(jobId, info, withTime))
-  }
-
-  /**
-   * Record task metrics into job log files, including execution info and shuffle metrics
-   * @param stageId Stage ID of the task
-   * @param status Status info of the task
-   * @param taskInfo Task description info
-   * @param taskMetrics Task running metrics
-   */
-  protected def recordTaskMetrics(stageId: Int, status: String,
-                                taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
-    val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageId +
-               " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
-               " EXECUTOR_ID=" + taskInfo.executorId +  " HOST=" + taskMetrics.hostname
-    val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
-    val gcTime = " GC_TIME=" + taskMetrics.jvmGCTime
-    val inputMetrics = taskMetrics.inputMetrics match {
-      case Some(metrics) =>
-        " READ_METHOD=" + metrics.readMethod.toString +
-        " INPUT_BYTES=" + metrics.bytesRead
-      case None => ""
-    }
-    val outputMetrics = taskMetrics.outputMetrics match {
-      case Some(metrics) =>
-        " OUTPUT_BYTES=" + metrics.bytesWritten
-      case None => ""
-    }
-    val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
-      case Some(metrics) =>
-        " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
-        " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
-        " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
-        " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
-        " REMOTE_BYTES_READ=" + metrics.remoteBytesRead +
-        " LOCAL_BYTES_READ=" + metrics.localBytesRead
-      case None => ""
-    }
-    val writeMetrics = taskMetrics.shuffleWriteMetrics match {
-      case Some(metrics) =>
-        " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten +
-        " SHUFFLE_WRITE_TIME=" + metrics.shuffleWriteTime
-      case None => ""
-    }
-    stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics + outputMetrics +
-      shuffleReadMetrics + writeMetrics)
-  }
-
-  /**
-   * When stage is submitted, record stage submit info
-   * @param stageSubmitted Stage submitted event
-   */
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
-    val stageInfo = stageSubmitted.stageInfo
-    stageLogInfo(stageInfo.stageId, "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
-      stageInfo.stageId, stageInfo.numTasks))
-  }
-
-  /**
-   * When stage is completed, record stage completion status
-   * @param stageCompleted Stage completed event
-   */
-  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
-    val stageId = stageCompleted.stageInfo.stageId
-    if (stageCompleted.stageInfo.failureReason.isEmpty) {
-      stageLogInfo(stageId, s"STAGE_ID=$stageId STATUS=COMPLETED")
-    } else {
-      stageLogInfo(stageId, s"STAGE_ID=$stageId STATUS=FAILED")
-    }
-  }
-
-  /**
-   * When task ends, record task completion status and metrics
-   * @param taskEnd Task end event
-   */
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
-    val taskInfo = taskEnd.taskInfo
-    var taskStatus = "TASK_TYPE=%s".format(taskEnd.taskType)
-    val taskMetrics = if (taskEnd.taskMetrics != null) taskEnd.taskMetrics else TaskMetrics.empty
-    taskEnd.reason match {
-      case Success => taskStatus += " STATUS=SUCCESS"
-        recordTaskMetrics(taskEnd.stageId, taskStatus, taskInfo, taskMetrics)
-      case Resubmitted =>
-        taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
-                      " STAGE_ID=" + taskEnd.stageId
-        stageLogInfo(taskEnd.stageId, taskStatus)
-      case FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) =>
-        taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
-                      taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
-                      mapId + " REDUCE_ID=" + reduceId
-        stageLogInfo(taskEnd.stageId, taskStatus)
-      case _ =>
-    }
-  }
-
-  /**
-   * When job ends, recording job completion status and close log file
-   * @param jobEnd Job end event
-   */
-  override def onJobEnd(jobEnd: SparkListenerJobEnd) {
-    val jobId = jobEnd.jobId
-    var info = "JOB_ID=" + jobId
-    jobEnd.jobResult match {
-      case JobSucceeded => info += " STATUS=SUCCESS"
-      case JobFailed(exception) =>
-        info += " STATUS=FAILED REASON="
-        exception.getMessage.split("\\s+").foreach(info += _ + "_")
-      case _ =>
-    }
-    jobLogInfo(jobId, info.substring(0, info.length - 1).toUpperCase)
-    closeLogWriter(jobId)
-  }
-
-  /**
-   * Record job properties into job log file
-   * @param jobId ID of the job
-   * @param properties Properties of the job
-   */
-  protected def recordJobProperties(jobId: Int, properties: Properties) {
-    if (properties != null) {
-      val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
-      jobLogInfo(jobId, description, withTime = false)
-    }
-  }
-
-  /**
-   * When job starts, record job property and stage graph
-   * @param jobStart Job start event
-   */
-  override def onJobStart(jobStart: SparkListenerJobStart) {
-    val jobId = jobStart.jobId
-    val properties = jobStart.properties
-    createLogWriter(jobId)
-    recordJobProperties(jobId, properties)
-    buildJobStageDependencies(jobId, jobStart.stageIds)
-    jobLogInfo(jobId, "JOB_ID=" + jobId + " STATUS=STARTED")
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org