You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/09/30 06:53:25 UTC

git commit: Minor cleanup of code.

Repository: spark
Updated Branches:
  refs/heads/master dc30e4504 -> 210404a56


Minor cleanup of code.

Author: Reynold Xin <rx...@apache.org>

Closes #2581 from rxin/minor-cleanup and squashes the following commits:

736a91b [Reynold Xin] Minor cleanup of code.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/210404a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/210404a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/210404a5

Branch: refs/heads/master
Commit: 210404a56197ad347f1e621ed53ef01327fba2bd
Parents: dc30e45
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Sep 29 21:53:21 2014 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Mon Sep 29 21:53:21 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/JobLogger.scala  | 17 +-----
 .../org/apache/spark/util/JsonProtocol.scala    |  1 -
 .../scala/org/apache/spark/util/Utils.scala     | 60 +++++++++-----------
 3 files changed, 31 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/210404a5/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index ceb434f..54904bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -20,15 +20,12 @@ package org.apache.spark.scheduler
 import java.io.{File, FileNotFoundException, IOException, PrintWriter}
 import java.text.SimpleDateFormat
 import java.util.{Date, Properties}
-import java.util.concurrent.LinkedBlockingQueue
 
 import scala.collection.mutable.HashMap
 
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.executor.TaskMetrics
 
 /**
  * :: DeveloperApi ::
@@ -62,24 +59,16 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
   private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
     override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
   }
-  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]
 
   createLogDir()
 
-  // The following 5 functions are used only in testing.
-  private[scheduler] def getLogDir = logDir
-  private[scheduler] def getJobIdToPrintWriter = jobIdToPrintWriter
-  private[scheduler] def getStageIdToJobId = stageIdToJobId
-  private[scheduler] def getJobIdToStageIds = jobIdToStageIds
-  private[scheduler] def getEventQueue = eventQueue
-
   /** Create a folder for log files, the folder's name is the creation time of jobLogger */
   protected def createLogDir() {
     val dir = new File(logDir + "/" + logDirName + "/")
     if (dir.exists()) {
       return
     }
-    if (dir.mkdirs() == false) {
+    if (!dir.mkdirs()) {
       // JobLogger should throw a exception rather than continue to construct this object.
       throw new IOException("create log directory error:" + logDir + "/" + logDirName + "/")
     }
@@ -261,7 +250,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
   protected def recordJobProperties(jobId: Int, properties: Properties) {
     if (properties != null) {
       val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
-      jobLogInfo(jobId, description, false)
+      jobLogInfo(jobId, description, withTime = false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/210404a5/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6a48f67..5b2e7d3 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -25,7 +25,6 @@ import scala.collection.Map
 import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.JsonAST._
-import org.json4s.jackson.JsonMethods._
 
 
 import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,

http://git-wip-us.apache.org/repos/asf/spark/blob/210404a5/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 10d4408..dbe0cfa 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer
 import java.util.{Properties, Locale, Random, UUID}
 import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
 
-import org.apache.log4j.PropertyConfigurator
-
 import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
@@ -37,12 +35,12 @@ import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.PropertyConfigurator
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
 import org.json4s._
 import tachyon.client.{TachyonFile,TachyonFS}
 
 import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 
@@ -86,7 +84,7 @@ private[spark] object Utils extends Logging {
     ois.readObject.asInstanceOf[T]
   }
 
-  /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */
+  /** Deserialize a Long value (used for [[org.apache.spark.api.python.PythonPartitioner]]) */
   def deserializeLongValue(bytes: Array[Byte]) : Long = {
     // Note: we assume that we are given a Long value encoded in network (big-endian) byte order
     var result = bytes(7) & 0xFFL
@@ -153,7 +151,7 @@ private[spark] object Utils extends Logging {
   def classForName(className: String) = Class.forName(className, true, getContextOrSparkClassLoader)
 
   /**
-   * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
+   * Primitive often used when writing [[java.nio.ByteBuffer]] to [[java.io.DataOutput]]
    */
   def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
     if (bb.hasArray) {
@@ -333,7 +331,7 @@ private[spark] object Utils extends Logging {
     val tempFile =  File.createTempFile("fetchFileTemp", null, new File(tempDir))
     val targetFile = new File(targetDir, filename)
     val uri = new URI(url)
-    val fileOverwrite = conf.getBoolean("spark.files.overwrite", false)
+    val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
     uri.getScheme match {
       case "http" | "https" | "ftp" =>
         logInfo("Fetching " + url + " to " + tempFile)
@@ -355,7 +353,7 @@ private[spark] object Utils extends Logging {
         uc.connect()
         val in = uc.getInputStream()
         val out = new FileOutputStream(tempFile)
-        Utils.copyStream(in, out, true)
+        Utils.copyStream(in, out, closeStreams = true)
         if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
           if (fileOverwrite) {
             targetFile.delete()
@@ -402,7 +400,7 @@ private[spark] object Utils extends Logging {
         val fs = getHadoopFileSystem(uri, hadoopConf)
         val in = fs.open(new Path(uri))
         val out = new FileOutputStream(tempFile)
-        Utils.copyStream(in, out, true)
+        Utils.copyStream(in, out, closeStreams = true)
         if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
           if (fileOverwrite) {
             targetFile.delete()
@@ -666,7 +664,7 @@ private[spark] object Utils extends Logging {
    */
   def deleteRecursively(file: File) {
     if (file != null) {
-      if ((file.isDirectory) && !isSymlink(file)) {
+      if (file.isDirectory() && !isSymlink(file)) {
         for (child <- listFilesSafely(file)) {
           deleteRecursively(child)
         }
@@ -701,11 +699,7 @@ private[spark] object Utils extends Logging {
       new File(file.getParentFile().getCanonicalFile(), file.getName())
     }
 
-    if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
-      return false
-    } else {
-      return true
-    }
+    !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())
   }
 
   /**
@@ -804,7 +798,7 @@ private[spark] object Utils extends Logging {
         .start()
     new Thread("read stdout for " + command(0)) {
       override def run() {
-        for (line <- Source.fromInputStream(process.getInputStream).getLines) {
+        for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
           System.err.println(line)
         }
       }
@@ -818,8 +812,10 @@ private[spark] object Utils extends Logging {
   /**
    * Execute a command and get its output, throwing an exception if it yields a code other than 0.
    */
-  def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."),
-                          extraEnvironment: Map[String, String] = Map.empty): String = {
+  def executeAndGetOutput(
+      command: Seq[String],
+      workingDir: File = new File("."),
+      extraEnvironment: Map[String, String] = Map.empty): String = {
     val builder = new ProcessBuilder(command: _*)
         .directory(workingDir)
     val environment = builder.environment()
@@ -829,7 +825,7 @@ private[spark] object Utils extends Logging {
     val process = builder.start()
     new Thread("read stderr for " + command(0)) {
       override def run() {
-        for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
+        for (line <- Source.fromInputStream(process.getErrorStream).getLines()) {
           System.err.println(line)
         }
       }
@@ -837,7 +833,7 @@ private[spark] object Utils extends Logging {
     val output = new StringBuffer
     val stdoutThread = new Thread("read stdout for " + command(0)) {
       override def run() {
-        for (line <- Source.fromInputStream(process.getInputStream).getLines) {
+        for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
           output.append(line)
         }
       }
@@ -846,8 +842,8 @@ private[spark] object Utils extends Logging {
     val exitCode = process.waitFor()
     stdoutThread.join()   // Wait for it to finish reading output
     if (exitCode != 0) {
-      logError(s"Process $command exited with code $exitCode: ${output}")
-      throw new SparkException("Process " + command + " exited with code " + exitCode)
+      logError(s"Process $command exited with code $exitCode: $output")
+      throw new SparkException(s"Process $command exited with code $exitCode")
     }
     output.toString
   }
@@ -860,6 +856,7 @@ private[spark] object Utils extends Logging {
     try {
       block
     } catch {
+      case e: ControlThrowable => throw e
       case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t)
     }
   }
@@ -884,13 +881,12 @@ private[spark] object Utils extends Logging {
    * @param skipClass Function that is used to exclude non-user-code classes.
    */
   def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
-    val trace = Thread.currentThread.getStackTrace()
-      .filterNot { ste:StackTraceElement =>
-        // When running under some profilers, the current stack trace might contain some bogus
-        // frames. This is intended to ensure that we don't crash in these situations by
-        // ignoring any frames that we can't examine.
-        (ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace"))
-      }
+    val trace = Thread.currentThread.getStackTrace().filterNot { ste: StackTraceElement =>
+      // When running under some profilers, the current stack trace might contain some bogus
+      // frames. This is intended to ensure that we don't crash in these situations by
+      // ignoring any frames that we can't examine.
+      ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace")
+    }
 
     // Keep crawling up the stack trace until we find the first function not inside of the spark
     // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
@@ -924,7 +920,7 @@ private[spark] object Utils extends Logging {
     }
     val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
     CallSite(
-      shortForm = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine),
+      shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine",
       longForm = callStack.take(callStackDepth).mkString("\n"))
   }
 
@@ -1027,7 +1023,7 @@ private[spark] object Utils extends Logging {
     false
   }
 
-  def isSpace(c: Char): Boolean = {
+  private def isSpace(c: Char): Boolean = {
     " \t\r\n".indexOf(c) != -1
   }
 
@@ -1179,7 +1175,7 @@ private[spark] object Utils extends Logging {
     }
     import scala.sys.process._
     (linkCmd + src.getAbsolutePath() + " " + dst.getPath() + cmdSuffix) lines_!
-      ProcessLogger(line => (logInfo(line)))
+    ProcessLogger(line => logInfo(line))
   }
 
 
@@ -1260,7 +1256,7 @@ private[spark] object Utils extends Logging {
     val startTime = System.currentTimeMillis
     while (!terminated) {
       try {
-        process.exitValue
+        process.exitValue()
         terminated = true
       } catch {
         case e: IllegalThreadStateException =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org