You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/03/07 21:42:10 UTC

spark git commit: [SPARK-23550][CORE] Cleanup `Utils`.

Repository: spark
Updated Branches:
  refs/heads/master 53561d27c -> c99fc9ad9


[SPARK-23550][CORE] Cleanup `Utils`.

A few different things going on:
- Remove unused methods.
- Move JSON methods to the only class that uses them.
- Move test-only methods to TestUtils.
- Make getMaxResultSize() a config constant.
- Reuse functionality from existing libraries (JRE or JavaUtils) where possible.

The change also includes changes to a few tests to call `Utils.createTempFile` correctly,
so that temp dirs are created under the designated top-level temp dir instead of
potentially polluting git index.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #20706 from vanzin/SPARK-23550.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c99fc9ad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c99fc9ad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c99fc9ad

Branch: refs/heads/master
Commit: c99fc9ad9b600095baba003053dbf84304ca392b
Parents: 53561d2
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Mar 7 13:42:06 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Mar 7 13:42:06 2018 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/TestUtils.scala |  26 +++-
 .../spark/deploy/SparkSubmitArguments.scala     |   4 +-
 .../org/apache/spark/executor/Executor.scala    |   4 +-
 .../spark/internal/config/ConfigBuilder.scala   |   3 +-
 .../apache/spark/internal/config/package.scala  |   5 +
 .../apache/spark/scheduler/TaskSetManager.scala |   3 +-
 .../org/apache/spark/util/JsonProtocol.scala    | 124 ++++++++++--------
 .../scala/org/apache/spark/util/Utils.scala     | 131 +------------------
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  |   2 +-
 .../scala/org/apache/spark/DriverSuite.scala    |   2 +-
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  18 +--
 .../spark/scheduler/ReplayListenerSuite.scala   |  12 +-
 .../org/apache/spark/util/UtilsSuite.scala      |   1 +
 scalastyle-config.xml                           |   2 +-
 .../datasources/orc/OrcSourceSuite.scala        |   4 +-
 .../execution/metric/SQLMetricsTestUtils.scala  |   6 +-
 .../sql/sources/PartitionedWriteSuite.scala     |  15 ++-
 .../thriftserver/HiveThriftServer2Suites.scala  |   2 +-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  20 +--
 .../spark/streaming/CheckpointSuite.scala       |   2 +-
 .../spark/streaming/MapWithStateSuite.scala     |   2 +-
 21 files changed, 152 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 93e7ee3..b5c4c70 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -22,7 +22,7 @@ import java.net.{HttpURLConnection, URI, URL}
 import java.nio.charset.StandardCharsets
 import java.security.SecureRandom
 import java.security.cert.X509Certificate
-import java.util.Arrays
+import java.util.{Arrays, Properties}
 import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
 import java.util.jar.{JarEntry, JarOutputStream}
 import javax.net.ssl._
@@ -35,6 +35,7 @@ import scala.sys.process.{Process, ProcessLogger}
 import scala.util.Try
 
 import com.google.common.io.{ByteStreams, Files}
+import org.apache.log4j.PropertyConfigurator
 
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
@@ -256,6 +257,29 @@ private[spark] object TestUtils {
       s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
   }
 
+  /**
+   * config a log4j properties used for testsuite
+   */
+  def configTestLog4j(level: String): Unit = {
+    val pro = new Properties()
+    pro.put("log4j.rootLogger", s"$level, console")
+    pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
+    pro.put("log4j.appender.console.target", "System.err")
+    pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
+    pro.put("log4j.appender.console.layout.ConversionPattern",
+      "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
+    PropertyConfigurator.configure(pro)
+  }
+
+  /**
+   * Lists files recursively.
+   */
+  def recursiveList(f: File): Array[File] = {
+    require(f.isDirectory)
+    val current = f.listFiles
+    current ++ current.filter(_.isDirectory).flatMap(recursiveList)
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 9db7a1f..e7796d4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy
 
-import java.io.{ByteArrayOutputStream, PrintStream}
+import java.io.{ByteArrayOutputStream, File, PrintStream}
 import java.lang.reflect.InvocationTargetException
 import java.net.URI
 import java.nio.charset.StandardCharsets
@@ -233,7 +233,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
     // Set name from main class if not given
     name = Option(name).orElse(Option(mainClass)).orNull
     if (name == null && primaryResource != null) {
-      name = Utils.stripDirectory(primaryResource)
+      name = new File(primaryResource).getName()
     }
 
     // Action should be SUBMIT unless otherwise specified

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2c3a8ef..dcec3ec 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -35,6 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
@@ -141,8 +142,7 @@ private[spark] class Executor(
     conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
     RpcUtils.maxMessageSizeBytes(conf))
 
-  // Limit of bytes for total size of results (default is 1GB)
-  private val maxResultSize = Utils.getMaxResultSize(conf)
+  private val maxResultSize = conf.get(MAX_RESULT_SIZE)
 
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index b0cd711..f27aca0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -23,6 +23,7 @@ import java.util.regex.PatternSyntaxException
 import scala.util.matching.Regex
 
 import org.apache.spark.network.util.{ByteUnit, JavaUtils}
+import org.apache.spark.util.Utils
 
 private object ConfigHelpers {
 
@@ -45,7 +46,7 @@ private object ConfigHelpers {
   }
 
   def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
-    str.split(",").map(_.trim()).filter(_.nonEmpty).map(converter)
+    Utils.stringToSeq(str).map(converter)
   }
 
   def seqToString[T](v: Seq[T], stringConverter: T => String): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index bbfcfba..a313ad0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -520,4 +520,9 @@ package object config {
       .checkValue(v => v > 0, "The threshold should be positive.")
       .createWithDefault(10000000)
 
+  private[spark] val MAX_RESULT_SIZE = ConfigBuilder("spark.driver.maxResultSize")
+    .doc("Size limit for results.")
+    .bytesConf(ByteUnit.BYTE)
+    .createWithDefaultString("1g")
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 886c2c9..d958658 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -64,8 +64,7 @@ private[spark] class TaskSetManager(
   val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
   val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
 
-  // Limit of bytes for total size of results (default is 1GB)
-  val maxResultSize = Utils.getMaxResultSize(conf)
+  val maxResultSize = conf.get(config.MAX_RESULT_SIZE)
 
   val speculationEnabled = conf.getBoolean("spark.speculation", false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ff83301..40383fe 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -48,7 +48,7 @@ import org.apache.spark.storage._
  * To ensure that we provide these guarantees, follow these rules when modifying these methods:
  *
  *  - Never delete any JSON fields.
- *  - Any new JSON fields should be optional; use `Utils.jsonOption` when reading these fields
+ *  - Any new JSON fields should be optional; use `jsonOption` when reading these fields
  *    in `*FromJson` methods.
  */
 private[spark] object JsonProtocol {
@@ -408,7 +408,7 @@ private[spark] object JsonProtocol {
         ("Loss Reason" -> reason.map(_.toString))
       case taskKilled: TaskKilled =>
         ("Kill Reason" -> taskKilled.reason)
-      case _ => Utils.emptyJson
+      case _ => emptyJson
     }
     ("Reason" -> reason) ~ json
   }
@@ -422,7 +422,7 @@ private[spark] object JsonProtocol {
   def jobResultToJson(jobResult: JobResult): JValue = {
     val result = Utils.getFormattedClassName(jobResult)
     val json = jobResult match {
-      case JobSucceeded => Utils.emptyJson
+      case JobSucceeded => emptyJson
       case jobFailed: JobFailed =>
         JObject("Exception" -> exceptionToJson(jobFailed.exception))
     }
@@ -573,7 +573,7 @@ private[spark] object JsonProtocol {
   def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
     val stageId = (json \ "Stage ID").extract[Int]
     val stageAttemptId =
-      Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
+      jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
     val taskInfo = taskInfoFromJson(json \ "Task Info")
     SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
   }
@@ -586,7 +586,7 @@ private[spark] object JsonProtocol {
   def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
     val stageId = (json \ "Stage ID").extract[Int]
     val stageAttemptId =
-      Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
+      jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
     val taskType = (json \ "Task Type").extract[String]
     val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
     val taskInfo = taskInfoFromJson(json \ "Task Info")
@@ -597,11 +597,11 @@ private[spark] object JsonProtocol {
   def jobStartFromJson(json: JValue): SparkListenerJobStart = {
     val jobId = (json \ "Job ID").extract[Int]
     val submissionTime =
-      Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
+      jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
     val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
     val properties = propertiesFromJson(json \ "Properties")
     // The "Stage Infos" field was added in Spark 1.2.0
-    val stageInfos = Utils.jsonOption(json \ "Stage Infos")
+    val stageInfos = jsonOption(json \ "Stage Infos")
       .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
         stageIds.map { id =>
           new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown")
@@ -613,7 +613,7 @@ private[spark] object JsonProtocol {
   def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
     val jobId = (json \ "Job ID").extract[Int]
     val completionTime =
-      Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
+      jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
     val jobResult = jobResultFromJson(json \ "Job Result")
     SparkListenerJobEnd(jobId, completionTime, jobResult)
   }
@@ -630,15 +630,15 @@ private[spark] object JsonProtocol {
   def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
     val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
     val maxMem = (json \ "Maximum Memory").extract[Long]
-    val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
-    val maxOnHeapMem = Utils.jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long])
-    val maxOffHeapMem = Utils.jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long])
+    val time = jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+    val maxOnHeapMem = jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long])
+    val maxOffHeapMem = jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long])
     SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem, maxOffHeapMem)
   }
 
   def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
     val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
-    val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+    val time = jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
     SparkListenerBlockManagerRemoved(time, blockManagerId)
   }
 
@@ -648,11 +648,11 @@ private[spark] object JsonProtocol {
 
   def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
     val appName = (json \ "App Name").extract[String]
-    val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
+    val appId = jsonOption(json \ "App ID").map(_.extract[String])
     val time = (json \ "Timestamp").extract[Long]
     val sparkUser = (json \ "User").extract[String]
-    val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
-    val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson)
+    val appAttemptId = jsonOption(json \ "App Attempt ID").map(_.extract[String])
+    val driverLogs = jsonOption(json \ "Driver Logs").map(mapFromJson)
     SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs)
   }
 
@@ -703,19 +703,19 @@ private[spark] object JsonProtocol {
 
   def stageInfoFromJson(json: JValue): StageInfo = {
     val stageId = (json \ "Stage ID").extract[Int]
-    val attemptId = Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
+    val attemptId = jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
     val stageName = (json \ "Stage Name").extract[String]
     val numTasks = (json \ "Number of Tasks").extract[Int]
     val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
-    val parentIds = Utils.jsonOption(json \ "Parent IDs")
+    val parentIds = jsonOption(json \ "Parent IDs")
       .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
       .getOrElse(Seq.empty)
-    val details = Utils.jsonOption(json \ "Details").map(_.extract[String]).getOrElse("")
-    val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
-    val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
-    val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
+    val details = jsonOption(json \ "Details").map(_.extract[String]).getOrElse("")
+    val submissionTime = jsonOption(json \ "Submission Time").map(_.extract[Long])
+    val completionTime = jsonOption(json \ "Completion Time").map(_.extract[Long])
+    val failureReason = jsonOption(json \ "Failure Reason").map(_.extract[String])
     val accumulatedValues = {
-      Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
+      jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
         case Some(values) => values.map(accumulableInfoFromJson)
         case None => Seq.empty[AccumulableInfo]
       }
@@ -735,17 +735,17 @@ private[spark] object JsonProtocol {
   def taskInfoFromJson(json: JValue): TaskInfo = {
     val taskId = (json \ "Task ID").extract[Long]
     val index = (json \ "Index").extract[Int]
-    val attempt = Utils.jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
+    val attempt = jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
     val launchTime = (json \ "Launch Time").extract[Long]
     val executorId = (json \ "Executor ID").extract[String].intern()
     val host = (json \ "Host").extract[String].intern()
     val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
-    val speculative = Utils.jsonOption(json \ "Speculative").exists(_.extract[Boolean])
+    val speculative = jsonOption(json \ "Speculative").exists(_.extract[Boolean])
     val gettingResultTime = (json \ "Getting Result Time").extract[Long]
     val finishTime = (json \ "Finish Time").extract[Long]
     val failed = (json \ "Failed").extract[Boolean]
-    val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean])
-    val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
+    val killed = jsonOption(json \ "Killed").exists(_.extract[Boolean])
+    val accumulables = jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
       case Some(values) => values.map(accumulableInfoFromJson)
       case None => Seq.empty[AccumulableInfo]
     }
@@ -762,13 +762,13 @@ private[spark] object JsonProtocol {
 
   def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
     val id = (json \ "ID").extract[Long]
-    val name = Utils.jsonOption(json \ "Name").map(_.extract[String])
-    val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
-    val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
-    val internal = Utils.jsonOption(json \ "Internal").exists(_.extract[Boolean])
+    val name = jsonOption(json \ "Name").map(_.extract[String])
+    val update = jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
+    val value = jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
+    val internal = jsonOption(json \ "Internal").exists(_.extract[Boolean])
     val countFailedValues =
-      Utils.jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean])
-    val metadata = Utils.jsonOption(json \ "Metadata").map(_.extract[String])
+      jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean])
+    val metadata = jsonOption(json \ "Metadata").map(_.extract[String])
     new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata)
   }
 
@@ -821,49 +821,49 @@ private[spark] object JsonProtocol {
     metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
 
     // Shuffle read metrics
-    Utils.jsonOption(json \ "Shuffle Read Metrics").foreach { readJson =>
+    jsonOption(json \ "Shuffle Read Metrics").foreach { readJson =>
       val readMetrics = metrics.createTempShuffleReadMetrics()
       readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
       readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
       readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
-      Utils.jsonOption(readJson \ "Remote Bytes Read To Disk")
+      jsonOption(readJson \ "Remote Bytes Read To Disk")
         .foreach { v => readMetrics.incRemoteBytesReadToDisk(v.extract[Long])}
       readMetrics.incLocalBytesRead(
-        Utils.jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
+        jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
       readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
       readMetrics.incRecordsRead(
-        Utils.jsonOption(readJson \ "Total Records Read").map(_.extract[Long]).getOrElse(0L))
+        jsonOption(readJson \ "Total Records Read").map(_.extract[Long]).getOrElse(0L))
       metrics.mergeShuffleReadMetrics()
     }
 
     // Shuffle write metrics
     // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
-    Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
+    jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
       val writeMetrics = metrics.shuffleWriteMetrics
       writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
       writeMetrics.incRecordsWritten(
-        Utils.jsonOption(writeJson \ "Shuffle Records Written").map(_.extract[Long]).getOrElse(0L))
+        jsonOption(writeJson \ "Shuffle Records Written").map(_.extract[Long]).getOrElse(0L))
       writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
     }
 
     // Output metrics
-    Utils.jsonOption(json \ "Output Metrics").foreach { outJson =>
+    jsonOption(json \ "Output Metrics").foreach { outJson =>
       val outputMetrics = metrics.outputMetrics
       outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
       outputMetrics.setRecordsWritten(
-        Utils.jsonOption(outJson \ "Records Written").map(_.extract[Long]).getOrElse(0L))
+        jsonOption(outJson \ "Records Written").map(_.extract[Long]).getOrElse(0L))
     }
 
     // Input metrics
-    Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
+    jsonOption(json \ "Input Metrics").foreach { inJson =>
       val inputMetrics = metrics.inputMetrics
       inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
       inputMetrics.incRecordsRead(
-        Utils.jsonOption(inJson \ "Records Read").map(_.extract[Long]).getOrElse(0L))
+        jsonOption(inJson \ "Records Read").map(_.extract[Long]).getOrElse(0L))
     }
 
     // Updated blocks
-    Utils.jsonOption(json \ "Updated Blocks").foreach { blocksJson =>
+    jsonOption(json \ "Updated Blocks").foreach { blocksJson =>
       metrics.setUpdatedBlockStatuses(blocksJson.extract[List[JValue]].map { blockJson =>
         val id = BlockId((blockJson \ "Block ID").extract[String])
         val status = blockStatusFromJson(blockJson \ "Status")
@@ -897,7 +897,7 @@ private[spark] object JsonProtocol {
         val shuffleId = (json \ "Shuffle ID").extract[Int]
         val mapId = (json \ "Map ID").extract[Int]
         val reduceId = (json \ "Reduce ID").extract[Int]
-        val message = Utils.jsonOption(json \ "Message").map(_.extract[String])
+        val message = jsonOption(json \ "Message").map(_.extract[String])
         new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId,
           message.getOrElse("Unknown reason"))
       case `exceptionFailure` =>
@@ -905,9 +905,9 @@ private[spark] object JsonProtocol {
         val description = (json \ "Description").extract[String]
         val stackTrace = stackTraceFromJson(json \ "Stack Trace")
         val fullStackTrace =
-          Utils.jsonOption(json \ "Full Stack Trace").map(_.extract[String]).orNull
+          jsonOption(json \ "Full Stack Trace").map(_.extract[String]).orNull
         // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
-        val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
+        val accumUpdates = jsonOption(json \ "Accumulator Updates")
           .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
           .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => {
             acc.toInfo(Some(acc.value), None)
@@ -915,21 +915,21 @@ private[spark] object JsonProtocol {
         ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
       case `taskResultLost` => TaskResultLost
       case `taskKilled` =>
-        val killReason = Utils.jsonOption(json \ "Kill Reason")
+        val killReason = jsonOption(json \ "Kill Reason")
           .map(_.extract[String]).getOrElse("unknown reason")
         TaskKilled(killReason)
       case `taskCommitDenied` =>
         // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
         // de/serialization logic was not added until 1.5.1. To provide backward compatibility
         // for reading those logs, we need to provide default values for all the fields.
-        val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
-        val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
-        val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
+        val jobId = jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
+        val partitionId = jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
+        val attemptNo = jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
         TaskCommitDenied(jobId, partitionId, attemptNo)
       case `executorLostFailure` =>
-        val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
-        val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
-        val reason = Utils.jsonOption(json \ "Loss Reason").map(_.extract[String])
+        val exitCausedByApp = jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
+        val executorId = jsonOption(json \ "Executor ID").map(_.extract[String])
+        val reason = jsonOption(json \ "Loss Reason").map(_.extract[String])
         ExecutorLostFailure(
           executorId.getOrElse("Unknown"),
           exitCausedByApp.getOrElse(true),
@@ -968,11 +968,11 @@ private[spark] object JsonProtocol {
   def rddInfoFromJson(json: JValue): RDDInfo = {
     val rddId = (json \ "RDD ID").extract[Int]
     val name = (json \ "Name").extract[String]
-    val scope = Utils.jsonOption(json \ "Scope")
+    val scope = jsonOption(json \ "Scope")
       .map(_.extract[String])
       .map(RDDOperationScope.fromJson)
-    val callsite = Utils.jsonOption(json \ "Callsite").map(_.extract[String]).getOrElse("")
-    val parentIds = Utils.jsonOption(json \ "Parent IDs")
+    val callsite = jsonOption(json \ "Callsite").map(_.extract[String]).getOrElse("")
+    val parentIds = jsonOption(json \ "Parent IDs")
       .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
       .getOrElse(Seq.empty)
     val storageLevel = storageLevelFromJson(json \ "Storage Level")
@@ -1029,7 +1029,7 @@ private[spark] object JsonProtocol {
   }
 
   def propertiesFromJson(json: JValue): Properties = {
-    Utils.jsonOption(json).map { value =>
+    jsonOption(json).map { value =>
       val properties = new Properties
       mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
       properties
@@ -1058,4 +1058,14 @@ private[spark] object JsonProtocol {
     e
   }
 
+  /** Return an option that translates JNothing to None */
+  private def jsonOption(json: JValue): Option[JValue] = {
+    json match {
+      case JNothing => None
+      case value: JValue => Some(value)
+    }
+  }
+
+  private def emptyJson: JObject = JObject(List[JField]())
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2e2a4a2..29d26ea 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -25,7 +25,7 @@ import java.net._
 import java.nio.ByteBuffer
 import java.nio.channels.{Channels, FileChannel}
 import java.nio.charset.StandardCharsets
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
 import java.util.{Locale, Properties, Random, UUID}
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
@@ -51,9 +51,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.log4j.PropertyConfigurator
 import org.eclipse.jetty.util.MultiException
-import org.json4s._
 import org.slf4j.Logger
 
 import org.apache.spark._
@@ -1017,71 +1015,19 @@ private[spark] object Utils extends Logging {
     " " + (System.currentTimeMillis - startTimeMs) + " ms"
   }
 
-  private def listFilesSafely(file: File): Seq[File] = {
-    if (file.exists()) {
-      val files = file.listFiles()
-      if (files == null) {
-        throw new IOException("Failed to list files for dir: " + file)
-      }
-      files
-    } else {
-      List()
-    }
-  }
-
-  /**
-   * Lists files recursively.
-   */
-  def recursiveList(f: File): Array[File] = {
-    require(f.isDirectory)
-    val current = f.listFiles
-    current ++ current.filter(_.isDirectory).flatMap(recursiveList)
-  }
-
   /**
    * Delete a file or directory and its contents recursively.
    * Don't follow directories if they are symlinks.
    * Throws an exception if deletion is unsuccessful.
    */
-  def deleteRecursively(file: File) {
+  def deleteRecursively(file: File): Unit = {
     if (file != null) {
-      try {
-        if (file.isDirectory && !isSymlink(file)) {
-          var savedIOException: IOException = null
-          for (child <- listFilesSafely(file)) {
-            try {
-              deleteRecursively(child)
-            } catch {
-              // In case of multiple exceptions, only last one will be thrown
-              case ioe: IOException => savedIOException = ioe
-            }
-          }
-          if (savedIOException != null) {
-            throw savedIOException
-          }
-          ShutdownHookManager.removeShutdownDeleteDir(file)
-        }
-      } finally {
-        if (file.delete()) {
-          logTrace(s"${file.getAbsolutePath} has been deleted")
-        } else {
-          // Delete can also fail if the file simply did not exist
-          if (file.exists()) {
-            throw new IOException("Failed to delete: " + file.getAbsolutePath)
-          }
-        }
-      }
+      JavaUtils.deleteRecursively(file)
+      ShutdownHookManager.removeShutdownDeleteDir(file)
     }
   }
 
   /**
-   * Check to see if file is a symbolic link.
-   */
-  def isSymlink(file: File): Boolean = {
-    return Files.isSymbolicLink(Paths.get(file.toURI))
-  }
-
-  /**
    * Determines if a directory contains any files newer than cutoff seconds.
    *
    * @param dir must be the path to a directory, or IllegalArgumentException is thrown
@@ -1828,7 +1774,7 @@ private[spark] object Utils extends Logging {
    * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower
    * in the current version of Scala.
    */
-  def getIteratorSize[T](iterator: Iterator[T]): Long = {
+  def getIteratorSize(iterator: Iterator[_]): Long = {
     var count = 0L
     while (iterator.hasNext) {
       count += 1L
@@ -1875,17 +1821,6 @@ private[spark] object Utils extends Logging {
     obj.getClass.getSimpleName.replace("$", "")
   }
 
-  /** Return an option that translates JNothing to None */
-  def jsonOption(json: JValue): Option[JValue] = {
-    json match {
-      case JNothing => None
-      case value: JValue => Some(value)
-    }
-  }
-
-  /** Return an empty JSON object */
-  def emptyJson: JsonAST.JObject = JObject(List[JField]())
-
   /**
    * Return a Hadoop FileSystem with the scheme encoded in the given path.
    */
@@ -1901,15 +1836,6 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Return the absolute path of a file in the given directory.
-   */
-  def getFilePath(dir: File, fileName: String): Path = {
-    assert(dir.isDirectory)
-    val path = new File(dir, fileName).getAbsolutePath
-    new Path(path)
-  }
-
-  /**
    * Whether the underlying operating system is Windows.
    */
   val isWindows = SystemUtils.IS_OS_WINDOWS
@@ -1932,13 +1858,6 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Strip the directory from a path name
-   */
-  def stripDirectory(path: String): String = {
-    new File(path).getName
-  }
-
-  /**
    * Terminates a process waiting for at most the specified duration.
    *
    * @return the process exit value if it was successfully terminated, else None
@@ -2349,36 +2268,6 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * config a log4j properties used for testsuite
-   */
-  def configTestLog4j(level: String): Unit = {
-    val pro = new Properties()
-    pro.put("log4j.rootLogger", s"$level, console")
-    pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
-    pro.put("log4j.appender.console.target", "System.err")
-    pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
-    pro.put("log4j.appender.console.layout.ConversionPattern",
-      "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
-    PropertyConfigurator.configure(pro)
-  }
-
-  def invoke(
-      clazz: Class[_],
-      obj: AnyRef,
-      methodName: String,
-      args: (Class[_], AnyRef)*): AnyRef = {
-    val (types, values) = args.unzip
-    val method = clazz.getDeclaredMethod(methodName, types: _*)
-    method.setAccessible(true)
-    method.invoke(obj, values.toSeq: _*)
-  }
-
-  // Limit of bytes for total size of results (default is 1GB)
-  def getMaxResultSize(conf: SparkConf): Long = {
-    memoryStringToMb(conf.get("spark.driver.maxResultSize", "1g")).toLong << 20
-  }
-
-  /**
    * Return the current system LD_LIBRARY_PATH name
    */
   def libraryPathEnvName: String = {
@@ -2611,16 +2500,6 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Unions two comma-separated lists of files and filters out empty strings.
-   */
-  def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = {
-    var allFiles = Set.empty[String]
-    leftList.foreach { value => allFiles ++= value.split(",") }
-    rightList.foreach { value => allFiles ++= value.split(",") }
-    allFiles.filter { _.nonEmpty }
-  }
-
-  /**
    * Return the jar files pointed by the "spark.jars" property. Spark internally will distribute
    * these jars through file server. In the YARN mode, it will return an empty list, since YARN
    * has its own mechanism to distribute jars.

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 24a55df..0d5c5ea 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -95,7 +95,7 @@ public class UnsafeShuffleWriterSuite {
   @SuppressWarnings("unchecked")
   public void setUp() throws IOException {
     MockitoAnnotations.initMocks(this);
-    tempDir = Utils.createTempDir("test", "test");
+    tempDir = Utils.createTempDir(null, "test");
     mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir);
     partitionSizesInMergedFile = null;
     spillFilesCreated.clear();

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/test/scala/org/apache/spark/DriverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 962945e..896cd2e 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -51,7 +51,7 @@ class DriverSuite extends SparkFunSuite with TimeLimits {
  */
 object DriverWithoutCleanup {
   def main(args: Array[String]) {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
     val conf = new SparkConf
     val sc = new SparkContext(args(0), "DriverWithoutCleanup", conf)
     sc.parallelize(1 to 100, 4).count()

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 803a38d..d265643 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
+import org.apache.spark.TestUtils
 import org.apache.spark.TestUtils.JavaSourceFromString
 import org.apache.spark.api.r.RUtils
 import org.apache.spark.deploy.SparkSubmit._
@@ -761,18 +762,6 @@ class SparkSubmitSuite
     }
   }
 
-  test("comma separated list of files are unioned correctly") {
-    val left = Option("/tmp/a.jar,/tmp/b.jar")
-    val right = Option("/tmp/c.jar,/tmp/a.jar")
-    val emptyString = Option("")
-    Utils.unionFileLists(left, right) should be (Set("/tmp/a.jar", "/tmp/b.jar", "/tmp/c.jar"))
-    Utils.unionFileLists(emptyString, emptyString) should be (Set.empty)
-    Utils.unionFileLists(Option("/tmp/a.jar"), emptyString) should be (Set("/tmp/a.jar"))
-    Utils.unionFileLists(emptyString, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar"))
-    Utils.unionFileLists(None, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar"))
-    Utils.unionFileLists(Option("/tmp/a.jar"), None) should be (Set("/tmp/a.jar"))
-  }
-
   test("support glob path") {
     val tmpJarDir = Utils.createTempDir()
     val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"), tmpJarDir)
@@ -1042,6 +1031,7 @@ class SparkSubmitSuite
 
     assert(exception.getMessage() === "hello")
   }
+
 }
 
 object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
@@ -1076,7 +1066,7 @@ object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
 
 object JarCreationTest extends Logging {
   def main(args: Array[String]) {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
     val conf = new SparkConf()
     val sc = new SparkContext(conf)
     val result = sc.makeRDD(1 to 100, 10).mapPartitions { x =>
@@ -1100,7 +1090,7 @@ object JarCreationTest extends Logging {
 
 object SimpleApplicationTest {
   def main(args: Array[String]) {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
     val conf = new SparkConf()
     val sc = new SparkContext(conf)
     val configs = Seq("spark.master", "spark.app.name")

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 73e7b3f..e24d550 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -47,7 +47,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
   }
 
   test("Simple replay") {
-    val logFilePath = Utils.getFilePath(testDir, "events.txt")
+    val logFilePath = getFilePath(testDir, "events.txt")
     val fstream = fileSystem.create(logFilePath)
     val writer = new PrintWriter(fstream)
     val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
@@ -97,7 +97,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
       // scalastyle:on println
     }
 
-    val logFilePath = Utils.getFilePath(testDir, "events.lz4.inprogress")
+    val logFilePath = getFilePath(testDir, "events.lz4.inprogress")
     val bytes = buffered.toByteArray
     Utils.tryWithResource(fileSystem.create(logFilePath)) { fstream =>
       fstream.write(bytes, 0, buffered.size)
@@ -129,7 +129,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
   }
 
   test("Replay incompatible event log") {
-    val logFilePath = Utils.getFilePath(testDir, "incompatible.txt")
+    val logFilePath = getFilePath(testDir, "incompatible.txt")
     val fstream = fileSystem.create(logFilePath)
     val writer = new PrintWriter(fstream)
     val applicationStart = SparkListenerApplicationStart("Incompatible App", None,
@@ -226,6 +226,12 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
     }
   }
 
+  private def getFilePath(dir: File, fileName: String): Path = {
+    assert(dir.isDirectory)
+    val path = new File(dir, fileName).getAbsolutePath
+    new Path(path)
+  }
+
   /**
    * A simple listener that buffers all the events it receives.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index eaea6b0..3b42731 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -648,6 +648,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
   test("fetch hcfs dir") {
     val tempDir = Utils.createTempDir()
     val sourceDir = new File(tempDir, "source-dir")
+    sourceDir.mkdir()
     val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
     val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
     val targetDir = new File(tempDir, "target-dir")

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index e2fa575..e65e3aa 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -229,7 +229,7 @@ This file is divided into 3 sections:
 
   <check customId="extractopt" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
     <parameters><parameter name="regex">extractOpt</parameter></parameters>
-    <customMessage>Use Utils.jsonOption(x).map(.extract[T]) instead of .extractOpt[T], as the latter
+    <customMessage>Use jsonOption(x).map(.extract[T]) instead of .extractOpt[T], as the latter
     is slower.  </customMessage>
   </check>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 523f7cf..8a3bbd0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -39,8 +39,8 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
   protected override def beforeAll(): Unit = {
     super.beforeAll()
 
-    orcTableAsDir = Utils.createTempDir("orctests", "sparksql")
-    orcTableDir = Utils.createTempDir("orctests", "sparksql")
+    orcTableAsDir = Utils.createTempDir(namePrefix = "orctests")
+    orcTableDir = Utils.createTempDir(namePrefix = "orctests")
 
     sparkContext
       .makeRDD(1 to 10)

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 122d287..534d8bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -21,13 +21,13 @@ import java.io.File
 
 import scala.collection.mutable.HashMap
 
+import org.apache.spark.TestUtils
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.SparkPlanInfo
 import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore}
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.util.Utils
 
 
 trait SQLMetricsTestUtils extends SQLTestUtils {
@@ -91,7 +91,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
         (0 until 100).map(i => (i, i + 1)).toDF("i", "j").repartition(2)
           .write.format(dataFormat).mode("overwrite").insertInto(tableName)
       }
-      assert(Utils.recursiveList(tableLocation).count(_.getName.startsWith("part-")) == 2)
+      assert(TestUtils.recursiveList(tableLocation).count(_.getName.startsWith("part-")) == 2)
     }
   }
 
@@ -121,7 +121,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
           .mode("overwrite")
           .insertInto(tableName)
       }
-      assert(Utils.recursiveList(dir).count(_.getName.startsWith("part-")) == 40)
+      assert(TestUtils.recursiveList(dir).count(_.getName.startsWith("part-")) == 40)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index 0fe33e8..27c983f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -22,6 +22,7 @@ import java.sql.Timestamp
 
 import org.apache.hadoop.mapreduce.TaskAttemptContext
 
+import org.apache.spark.TestUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
@@ -86,15 +87,15 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
     withTempDir { f =>
       spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
         .write.option("maxRecordsPerFile", 1).mode("overwrite").parquet(f.getAbsolutePath)
-      assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
+      assert(TestUtils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
 
       spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
         .write.option("maxRecordsPerFile", 2).mode("overwrite").parquet(f.getAbsolutePath)
-      assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2)
+      assert(TestUtils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2)
 
       spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
         .write.option("maxRecordsPerFile", -1).mode("overwrite").parquet(f.getAbsolutePath)
-      assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1)
+      assert(TestUtils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1)
     }
   }
 
@@ -106,7 +107,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
         .option("maxRecordsPerFile", 1)
         .mode("overwrite")
         .parquet(f.getAbsolutePath)
-      assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
+      assert(TestUtils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
     }
   }
 
@@ -133,14 +134,14 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
     val df = Seq((1, ts)).toDF("i", "ts")
     withTempPath { f =>
       df.write.partitionBy("ts").parquet(f.getAbsolutePath)
-      val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+      val files = TestUtils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
       assert(files.length == 1)
       checkPartitionValues(files.head, "2016-12-01 00:00:00")
     }
     withTempPath { f =>
       df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT")
         .partitionBy("ts").parquet(f.getAbsolutePath)
-      val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+      val files = TestUtils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
       assert(files.length == 1)
       // use timeZone option "GMT" to format partition value.
       checkPartitionValues(files.head, "2016-12-01 08:00:00")
@@ -148,7 +149,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
     withTempPath { f =>
       withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") {
         df.write.partitionBy("ts").parquet(f.getAbsolutePath)
-        val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+        val files = TestUtils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
         assert(files.length == 1)
         // if there isn't timeZone option, then use session local timezone.
         checkPartitionValues(files.head, "2016-12-01 08:00:00")

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 496f8c8..b32c547 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -804,7 +804,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
   protected var metastorePath: File = _
   protected def metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
 
-  private val pidDir: File = Utils.createTempDir("thriftserver-pid")
+  private val pidDir: File = Utils.createTempDir(namePrefix = "thriftserver-pid")
   protected var logPath: File = _
   protected var operationLogPath: File = _
   private var logTailingProcess: Process = _

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 2d31781..079fe45 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -330,7 +330,7 @@ class HiveSparkSubmitSuite
 
 object SetMetastoreURLTest extends Logging {
   def main(args: Array[String]): Unit = {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
 
     val sparkConf = new SparkConf(loadDefaults = true)
     val builder = SparkSession.builder()
@@ -368,7 +368,7 @@ object SetMetastoreURLTest extends Logging {
 
 object SetWarehouseLocationTest extends Logging {
   def main(args: Array[String]): Unit = {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
 
     val sparkConf = new SparkConf(loadDefaults = true).set("spark.ui.enabled", "false")
     val providedExpectedWarehouseLocation =
@@ -447,7 +447,7 @@ object SetWarehouseLocationTest extends Logging {
 // can load the jar defined with the function.
 object TemporaryHiveUDFTest extends Logging {
   def main(args: Array[String]) {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
     val conf = new SparkConf()
     conf.set("spark.ui.enabled", "false")
     val sc = new SparkContext(conf)
@@ -485,7 +485,7 @@ object TemporaryHiveUDFTest extends Logging {
 // can load the jar defined with the function.
 object PermanentHiveUDFTest1 extends Logging {
   def main(args: Array[String]) {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
     val conf = new SparkConf()
     conf.set("spark.ui.enabled", "false")
     val sc = new SparkContext(conf)
@@ -523,7 +523,7 @@ object PermanentHiveUDFTest1 extends Logging {
 // can load the jar defined with the function.
 object PermanentHiveUDFTest2 extends Logging {
   def main(args: Array[String]) {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
     val conf = new SparkConf()
     conf.set("spark.ui.enabled", "false")
     val sc = new SparkContext(conf)
@@ -558,7 +558,7 @@ object PermanentHiveUDFTest2 extends Logging {
 // We test if we can load user jars in both driver and executors when HiveContext is used.
 object SparkSubmitClassLoaderTest extends Logging {
   def main(args: Array[String]) {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
     val conf = new SparkConf()
     val hiveWarehouseLocation = Utils.createTempDir()
     conf.set("spark.ui.enabled", "false")
@@ -628,7 +628,7 @@ object SparkSubmitClassLoaderTest extends Logging {
 // We test if we can correctly set spark sql configurations when HiveContext is used.
 object SparkSQLConfTest extends Logging {
   def main(args: Array[String]) {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
     // We override the SparkConf to add spark.sql.hive.metastore.version and
     // spark.sql.hive.metastore.jars to the beginning of the conf entry array.
     // So, if metadataHive get initialized after we set spark.sql.hive.metastore.version but
@@ -669,7 +669,7 @@ object SPARK_9757 extends QueryTest {
   protected var spark: SparkSession = _
 
   def main(args: Array[String]): Unit = {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
 
     val hiveWarehouseLocation = Utils.createTempDir()
     val sparkContext = new SparkContext(
@@ -718,7 +718,7 @@ object SPARK_11009 extends QueryTest {
   protected var spark: SparkSession = _
 
   def main(args: Array[String]): Unit = {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
 
     val sparkContext = new SparkContext(
       new SparkConf()
@@ -749,7 +749,7 @@ object SPARK_14244 extends QueryTest {
   protected var spark: SparkSession = _
 
   def main(args: Array[String]): Unit = {
-    Utils.configTestLog4j("INFO")
+    TestUtils.configTestLog4j("INFO")
 
     val sparkContext = new SparkContext(
       new SparkConf()

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index ee2fd45..19b621f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -97,7 +97,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
     val batchDurationMillis = batchDuration.milliseconds
 
     // Setup the stream computation
-    val checkpointDir = Utils.createTempDir(this.getClass.getSimpleName()).toString
+    val checkpointDir = Utils.createTempDir(namePrefix = this.getClass.getSimpleName()).toString
     logDebug(s"Using checkpoint directory $checkpointDir")
     val ssc = createContextForCheckpointOperation(batchDuration)
     require(ssc.conf.get("spark.streaming.clock") === classOf[ManualClock].getName,

http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
index 3b662ec..06c0c2a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
@@ -39,7 +39,7 @@ class MapWithStateSuite extends SparkFunSuite
 
   before {
     StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
-    checkpointDir = Utils.createTempDir("checkpoint")
+    checkpointDir = Utils.createTempDir(namePrefix = "checkpoint")
   }
 
   after {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org