You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/03/07 21:42:10 UTC
spark git commit: [SPARK-23550][CORE] Cleanup `Utils`.
Repository: spark
Updated Branches:
refs/heads/master 53561d27c -> c99fc9ad9
[SPARK-23550][CORE] Cleanup `Utils`.
A few different things going on:
- Remove unused methods.
- Move JSON methods to the only class that uses them.
- Move test-only methods to TestUtils.
- Make getMaxResultSize() a config constant.
- Reuse functionality from existing libraries (JRE or JavaUtils) where possible.
The change also includes changes to a few tests to call `Utils.createTempFile` correctly,
so that temp dirs are created under the designated top-level temp dir instead of
potentially polluting git index.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #20706 from vanzin/SPARK-23550.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c99fc9ad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c99fc9ad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c99fc9ad
Branch: refs/heads/master
Commit: c99fc9ad9b600095baba003053dbf84304ca392b
Parents: 53561d2
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Mar 7 13:42:06 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Mar 7 13:42:06 2018 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/TestUtils.scala | 26 +++-
.../spark/deploy/SparkSubmitArguments.scala | 4 +-
.../org/apache/spark/executor/Executor.scala | 4 +-
.../spark/internal/config/ConfigBuilder.scala | 3 +-
.../apache/spark/internal/config/package.scala | 5 +
.../apache/spark/scheduler/TaskSetManager.scala | 3 +-
.../org/apache/spark/util/JsonProtocol.scala | 124 ++++++++++--------
.../scala/org/apache/spark/util/Utils.scala | 131 +------------------
.../shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +-
.../scala/org/apache/spark/DriverSuite.scala | 2 +-
.../apache/spark/deploy/SparkSubmitSuite.scala | 18 +--
.../spark/scheduler/ReplayListenerSuite.scala | 12 +-
.../org/apache/spark/util/UtilsSuite.scala | 1 +
scalastyle-config.xml | 2 +-
.../datasources/orc/OrcSourceSuite.scala | 4 +-
.../execution/metric/SQLMetricsTestUtils.scala | 6 +-
.../sql/sources/PartitionedWriteSuite.scala | 15 ++-
.../thriftserver/HiveThriftServer2Suites.scala | 2 +-
.../spark/sql/hive/HiveSparkSubmitSuite.scala | 20 +--
.../spark/streaming/CheckpointSuite.scala | 2 +-
.../spark/streaming/MapWithStateSuite.scala | 2 +-
21 files changed, 152 insertions(+), 236 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 93e7ee3..b5c4c70 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -22,7 +22,7 @@ import java.net.{HttpURLConnection, URI, URL}
import java.nio.charset.StandardCharsets
import java.security.SecureRandom
import java.security.cert.X509Certificate
-import java.util.Arrays
+import java.util.{Arrays, Properties}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream}
import javax.net.ssl._
@@ -35,6 +35,7 @@ import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try
import com.google.common.io.{ByteStreams, Files}
+import org.apache.log4j.PropertyConfigurator
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
@@ -256,6 +257,29 @@ private[spark] object TestUtils {
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
}
+ /**
+ * config a log4j properties used for testsuite
+ */
+ def configTestLog4j(level: String): Unit = {
+ val pro = new Properties()
+ pro.put("log4j.rootLogger", s"$level, console")
+ pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
+ pro.put("log4j.appender.console.target", "System.err")
+ pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
+ pro.put("log4j.appender.console.layout.ConversionPattern",
+ "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
+ PropertyConfigurator.configure(pro)
+ }
+
+ /**
+ * Lists files recursively.
+ */
+ def recursiveList(f: File): Array[File] = {
+ require(f.isDirectory)
+ val current = f.listFiles
+ current ++ current.filter(_.isDirectory).flatMap(recursiveList)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 9db7a1f..e7796d4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy
-import java.io.{ByteArrayOutputStream, PrintStream}
+import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.lang.reflect.InvocationTargetException
import java.net.URI
import java.nio.charset.StandardCharsets
@@ -233,7 +233,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Set name from main class if not given
name = Option(name).orElse(Option(mainClass)).orNull
if (name == null && primaryResource != null) {
- name = Utils.stripDirectory(primaryResource)
+ name = new File(primaryResource).getName()
}
// Action should be SUBMIT unless otherwise specified
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2c3a8ef..dcec3ec 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -35,6 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
@@ -141,8 +142,7 @@ private[spark] class Executor(
conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
RpcUtils.maxMessageSizeBytes(conf))
- // Limit of bytes for total size of results (default is 1GB)
- private val maxResultSize = Utils.getMaxResultSize(conf)
+ private val maxResultSize = conf.get(MAX_RESULT_SIZE)
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index b0cd711..f27aca0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -23,6 +23,7 @@ import java.util.regex.PatternSyntaxException
import scala.util.matching.Regex
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
+import org.apache.spark.util.Utils
private object ConfigHelpers {
@@ -45,7 +46,7 @@ private object ConfigHelpers {
}
def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
- str.split(",").map(_.trim()).filter(_.nonEmpty).map(converter)
+ Utils.stringToSeq(str).map(converter)
}
def seqToString[T](v: Seq[T], stringConverter: T => String): String = {
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index bbfcfba..a313ad0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -520,4 +520,9 @@ package object config {
.checkValue(v => v > 0, "The threshold should be positive.")
.createWithDefault(10000000)
+ private[spark] val MAX_RESULT_SIZE = ConfigBuilder("spark.driver.maxResultSize")
+ .doc("Size limit for results.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("1g")
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 886c2c9..d958658 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -64,8 +64,7 @@ private[spark] class TaskSetManager(
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
- // Limit of bytes for total size of results (default is 1GB)
- val maxResultSize = Utils.getMaxResultSize(conf)
+ val maxResultSize = conf.get(config.MAX_RESULT_SIZE)
val speculationEnabled = conf.getBoolean("spark.speculation", false)
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ff83301..40383fe 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -48,7 +48,7 @@ import org.apache.spark.storage._
* To ensure that we provide these guarantees, follow these rules when modifying these methods:
*
* - Never delete any JSON fields.
- * - Any new JSON fields should be optional; use `Utils.jsonOption` when reading these fields
+ * - Any new JSON fields should be optional; use `jsonOption` when reading these fields
* in `*FromJson` methods.
*/
private[spark] object JsonProtocol {
@@ -408,7 +408,7 @@ private[spark] object JsonProtocol {
("Loss Reason" -> reason.map(_.toString))
case taskKilled: TaskKilled =>
("Kill Reason" -> taskKilled.reason)
- case _ => Utils.emptyJson
+ case _ => emptyJson
}
("Reason" -> reason) ~ json
}
@@ -422,7 +422,7 @@ private[spark] object JsonProtocol {
def jobResultToJson(jobResult: JobResult): JValue = {
val result = Utils.getFormattedClassName(jobResult)
val json = jobResult match {
- case JobSucceeded => Utils.emptyJson
+ case JobSucceeded => emptyJson
case jobFailed: JobFailed =>
JObject("Exception" -> exceptionToJson(jobFailed.exception))
}
@@ -573,7 +573,7 @@ private[spark] object JsonProtocol {
def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
val stageId = (json \ "Stage ID").extract[Int]
val stageAttemptId =
- Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
+ jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
val taskInfo = taskInfoFromJson(json \ "Task Info")
SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
}
@@ -586,7 +586,7 @@ private[spark] object JsonProtocol {
def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
val stageId = (json \ "Stage ID").extract[Int]
val stageAttemptId =
- Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
+ jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
val taskType = (json \ "Task Type").extract[String]
val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
val taskInfo = taskInfoFromJson(json \ "Task Info")
@@ -597,11 +597,11 @@ private[spark] object JsonProtocol {
def jobStartFromJson(json: JValue): SparkListenerJobStart = {
val jobId = (json \ "Job ID").extract[Int]
val submissionTime =
- Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
+ jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
val properties = propertiesFromJson(json \ "Properties")
// The "Stage Infos" field was added in Spark 1.2.0
- val stageInfos = Utils.jsonOption(json \ "Stage Infos")
+ val stageInfos = jsonOption(json \ "Stage Infos")
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
stageIds.map { id =>
new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown")
@@ -613,7 +613,7 @@ private[spark] object JsonProtocol {
def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
val jobId = (json \ "Job ID").extract[Int]
val completionTime =
- Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
+ jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
val jobResult = jobResultFromJson(json \ "Job Result")
SparkListenerJobEnd(jobId, completionTime, jobResult)
}
@@ -630,15 +630,15 @@ private[spark] object JsonProtocol {
def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
val maxMem = (json \ "Maximum Memory").extract[Long]
- val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
- val maxOnHeapMem = Utils.jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long])
- val maxOffHeapMem = Utils.jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long])
+ val time = jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+ val maxOnHeapMem = jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long])
+ val maxOffHeapMem = jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long])
SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem, maxOffHeapMem)
}
def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
- val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+ val time = jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
SparkListenerBlockManagerRemoved(time, blockManagerId)
}
@@ -648,11 +648,11 @@ private[spark] object JsonProtocol {
def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
val appName = (json \ "App Name").extract[String]
- val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
+ val appId = jsonOption(json \ "App ID").map(_.extract[String])
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
- val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
- val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson)
+ val appAttemptId = jsonOption(json \ "App Attempt ID").map(_.extract[String])
+ val driverLogs = jsonOption(json \ "Driver Logs").map(mapFromJson)
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs)
}
@@ -703,19 +703,19 @@ private[spark] object JsonProtocol {
def stageInfoFromJson(json: JValue): StageInfo = {
val stageId = (json \ "Stage ID").extract[Int]
- val attemptId = Utils.jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
+ val attemptId = jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
- val parentIds = Utils.jsonOption(json \ "Parent IDs")
+ val parentIds = jsonOption(json \ "Parent IDs")
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
.getOrElse(Seq.empty)
- val details = Utils.jsonOption(json \ "Details").map(_.extract[String]).getOrElse("")
- val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
- val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
- val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
+ val details = jsonOption(json \ "Details").map(_.extract[String]).getOrElse("")
+ val submissionTime = jsonOption(json \ "Submission Time").map(_.extract[Long])
+ val completionTime = jsonOption(json \ "Completion Time").map(_.extract[Long])
+ val failureReason = jsonOption(json \ "Failure Reason").map(_.extract[String])
val accumulatedValues = {
- Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
+ jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq.empty[AccumulableInfo]
}
@@ -735,17 +735,17 @@ private[spark] object JsonProtocol {
def taskInfoFromJson(json: JValue): TaskInfo = {
val taskId = (json \ "Task ID").extract[Long]
val index = (json \ "Index").extract[Int]
- val attempt = Utils.jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
+ val attempt = jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
val launchTime = (json \ "Launch Time").extract[Long]
val executorId = (json \ "Executor ID").extract[String].intern()
val host = (json \ "Host").extract[String].intern()
val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
- val speculative = Utils.jsonOption(json \ "Speculative").exists(_.extract[Boolean])
+ val speculative = jsonOption(json \ "Speculative").exists(_.extract[Boolean])
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
- val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean])
- val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
+ val killed = jsonOption(json \ "Killed").exists(_.extract[Boolean])
+ val accumulables = jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq.empty[AccumulableInfo]
}
@@ -762,13 +762,13 @@ private[spark] object JsonProtocol {
def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
val id = (json \ "ID").extract[Long]
- val name = Utils.jsonOption(json \ "Name").map(_.extract[String])
- val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
- val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
- val internal = Utils.jsonOption(json \ "Internal").exists(_.extract[Boolean])
+ val name = jsonOption(json \ "Name").map(_.extract[String])
+ val update = jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
+ val value = jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
+ val internal = jsonOption(json \ "Internal").exists(_.extract[Boolean])
val countFailedValues =
- Utils.jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean])
- val metadata = Utils.jsonOption(json \ "Metadata").map(_.extract[String])
+ jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean])
+ val metadata = jsonOption(json \ "Metadata").map(_.extract[String])
new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata)
}
@@ -821,49 +821,49 @@ private[spark] object JsonProtocol {
metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
// Shuffle read metrics
- Utils.jsonOption(json \ "Shuffle Read Metrics").foreach { readJson =>
+ jsonOption(json \ "Shuffle Read Metrics").foreach { readJson =>
val readMetrics = metrics.createTempShuffleReadMetrics()
readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
- Utils.jsonOption(readJson \ "Remote Bytes Read To Disk")
+ jsonOption(readJson \ "Remote Bytes Read To Disk")
.foreach { v => readMetrics.incRemoteBytesReadToDisk(v.extract[Long])}
readMetrics.incLocalBytesRead(
- Utils.jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
+ jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
readMetrics.incRecordsRead(
- Utils.jsonOption(readJson \ "Total Records Read").map(_.extract[Long]).getOrElse(0L))
+ jsonOption(readJson \ "Total Records Read").map(_.extract[Long]).getOrElse(0L))
metrics.mergeShuffleReadMetrics()
}
// Shuffle write metrics
// TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
- Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
+ jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
val writeMetrics = metrics.shuffleWriteMetrics
writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
writeMetrics.incRecordsWritten(
- Utils.jsonOption(writeJson \ "Shuffle Records Written").map(_.extract[Long]).getOrElse(0L))
+ jsonOption(writeJson \ "Shuffle Records Written").map(_.extract[Long]).getOrElse(0L))
writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
}
// Output metrics
- Utils.jsonOption(json \ "Output Metrics").foreach { outJson =>
+ jsonOption(json \ "Output Metrics").foreach { outJson =>
val outputMetrics = metrics.outputMetrics
outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
outputMetrics.setRecordsWritten(
- Utils.jsonOption(outJson \ "Records Written").map(_.extract[Long]).getOrElse(0L))
+ jsonOption(outJson \ "Records Written").map(_.extract[Long]).getOrElse(0L))
}
// Input metrics
- Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
+ jsonOption(json \ "Input Metrics").foreach { inJson =>
val inputMetrics = metrics.inputMetrics
inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
inputMetrics.incRecordsRead(
- Utils.jsonOption(inJson \ "Records Read").map(_.extract[Long]).getOrElse(0L))
+ jsonOption(inJson \ "Records Read").map(_.extract[Long]).getOrElse(0L))
}
// Updated blocks
- Utils.jsonOption(json \ "Updated Blocks").foreach { blocksJson =>
+ jsonOption(json \ "Updated Blocks").foreach { blocksJson =>
metrics.setUpdatedBlockStatuses(blocksJson.extract[List[JValue]].map { blockJson =>
val id = BlockId((blockJson \ "Block ID").extract[String])
val status = blockStatusFromJson(blockJson \ "Status")
@@ -897,7 +897,7 @@ private[spark] object JsonProtocol {
val shuffleId = (json \ "Shuffle ID").extract[Int]
val mapId = (json \ "Map ID").extract[Int]
val reduceId = (json \ "Reduce ID").extract[Int]
- val message = Utils.jsonOption(json \ "Message").map(_.extract[String])
+ val message = jsonOption(json \ "Message").map(_.extract[String])
new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId,
message.getOrElse("Unknown reason"))
case `exceptionFailure` =>
@@ -905,9 +905,9 @@ private[spark] object JsonProtocol {
val description = (json \ "Description").extract[String]
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
val fullStackTrace =
- Utils.jsonOption(json \ "Full Stack Trace").map(_.extract[String]).orNull
+ jsonOption(json \ "Full Stack Trace").map(_.extract[String]).orNull
// Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
- val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
+ val accumUpdates = jsonOption(json \ "Accumulator Updates")
.map(_.extract[List[JValue]].map(accumulableInfoFromJson))
.getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => {
acc.toInfo(Some(acc.value), None)
@@ -915,21 +915,21 @@ private[spark] object JsonProtocol {
ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
case `taskResultLost` => TaskResultLost
case `taskKilled` =>
- val killReason = Utils.jsonOption(json \ "Kill Reason")
+ val killReason = jsonOption(json \ "Kill Reason")
.map(_.extract[String]).getOrElse("unknown reason")
TaskKilled(killReason)
case `taskCommitDenied` =>
// Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
// de/serialization logic was not added until 1.5.1. To provide backward compatibility
// for reading those logs, we need to provide default values for all the fields.
- val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
- val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
- val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
+ val jobId = jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
+ val partitionId = jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
+ val attemptNo = jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
TaskCommitDenied(jobId, partitionId, attemptNo)
case `executorLostFailure` =>
- val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
- val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
- val reason = Utils.jsonOption(json \ "Loss Reason").map(_.extract[String])
+ val exitCausedByApp = jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
+ val executorId = jsonOption(json \ "Executor ID").map(_.extract[String])
+ val reason = jsonOption(json \ "Loss Reason").map(_.extract[String])
ExecutorLostFailure(
executorId.getOrElse("Unknown"),
exitCausedByApp.getOrElse(true),
@@ -968,11 +968,11 @@ private[spark] object JsonProtocol {
def rddInfoFromJson(json: JValue): RDDInfo = {
val rddId = (json \ "RDD ID").extract[Int]
val name = (json \ "Name").extract[String]
- val scope = Utils.jsonOption(json \ "Scope")
+ val scope = jsonOption(json \ "Scope")
.map(_.extract[String])
.map(RDDOperationScope.fromJson)
- val callsite = Utils.jsonOption(json \ "Callsite").map(_.extract[String]).getOrElse("")
- val parentIds = Utils.jsonOption(json \ "Parent IDs")
+ val callsite = jsonOption(json \ "Callsite").map(_.extract[String]).getOrElse("")
+ val parentIds = jsonOption(json \ "Parent IDs")
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
.getOrElse(Seq.empty)
val storageLevel = storageLevelFromJson(json \ "Storage Level")
@@ -1029,7 +1029,7 @@ private[spark] object JsonProtocol {
}
def propertiesFromJson(json: JValue): Properties = {
- Utils.jsonOption(json).map { value =>
+ jsonOption(json).map { value =>
val properties = new Properties
mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
properties
@@ -1058,4 +1058,14 @@ private[spark] object JsonProtocol {
e
}
+ /** Return an option that translates JNothing to None */
+ private def jsonOption(json: JValue): Option[JValue] = {
+ json match {
+ case JNothing => None
+ case value: JValue => Some(value)
+ }
+ }
+
+ private def emptyJson: JObject = JObject(List[JField]())
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2e2a4a2..29d26ea 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -25,7 +25,7 @@ import java.net._
import java.nio.ByteBuffer
import java.nio.channels.{Channels, FileChannel}
import java.nio.charset.StandardCharsets
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
@@ -51,9 +51,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
-import org.json4s._
import org.slf4j.Logger
import org.apache.spark._
@@ -1017,71 +1015,19 @@ private[spark] object Utils extends Logging {
" " + (System.currentTimeMillis - startTimeMs) + " ms"
}
- private def listFilesSafely(file: File): Seq[File] = {
- if (file.exists()) {
- val files = file.listFiles()
- if (files == null) {
- throw new IOException("Failed to list files for dir: " + file)
- }
- files
- } else {
- List()
- }
- }
-
- /**
- * Lists files recursively.
- */
- def recursiveList(f: File): Array[File] = {
- require(f.isDirectory)
- val current = f.listFiles
- current ++ current.filter(_.isDirectory).flatMap(recursiveList)
- }
-
/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
* Throws an exception if deletion is unsuccessful.
*/
- def deleteRecursively(file: File) {
+ def deleteRecursively(file: File): Unit = {
if (file != null) {
- try {
- if (file.isDirectory && !isSymlink(file)) {
- var savedIOException: IOException = null
- for (child <- listFilesSafely(file)) {
- try {
- deleteRecursively(child)
- } catch {
- // In case of multiple exceptions, only last one will be thrown
- case ioe: IOException => savedIOException = ioe
- }
- }
- if (savedIOException != null) {
- throw savedIOException
- }
- ShutdownHookManager.removeShutdownDeleteDir(file)
- }
- } finally {
- if (file.delete()) {
- logTrace(s"${file.getAbsolutePath} has been deleted")
- } else {
- // Delete can also fail if the file simply did not exist
- if (file.exists()) {
- throw new IOException("Failed to delete: " + file.getAbsolutePath)
- }
- }
- }
+ JavaUtils.deleteRecursively(file)
+ ShutdownHookManager.removeShutdownDeleteDir(file)
}
}
/**
- * Check to see if file is a symbolic link.
- */
- def isSymlink(file: File): Boolean = {
- return Files.isSymbolicLink(Paths.get(file.toURI))
- }
-
- /**
* Determines if a directory contains any files newer than cutoff seconds.
*
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
@@ -1828,7 +1774,7 @@ private[spark] object Utils extends Logging {
* [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower
* in the current version of Scala.
*/
- def getIteratorSize[T](iterator: Iterator[T]): Long = {
+ def getIteratorSize(iterator: Iterator[_]): Long = {
var count = 0L
while (iterator.hasNext) {
count += 1L
@@ -1875,17 +1821,6 @@ private[spark] object Utils extends Logging {
obj.getClass.getSimpleName.replace("$", "")
}
- /** Return an option that translates JNothing to None */
- def jsonOption(json: JValue): Option[JValue] = {
- json match {
- case JNothing => None
- case value: JValue => Some(value)
- }
- }
-
- /** Return an empty JSON object */
- def emptyJson: JsonAST.JObject = JObject(List[JField]())
-
/**
* Return a Hadoop FileSystem with the scheme encoded in the given path.
*/
@@ -1901,15 +1836,6 @@ private[spark] object Utils extends Logging {
}
/**
- * Return the absolute path of a file in the given directory.
- */
- def getFilePath(dir: File, fileName: String): Path = {
- assert(dir.isDirectory)
- val path = new File(dir, fileName).getAbsolutePath
- new Path(path)
- }
-
- /**
* Whether the underlying operating system is Windows.
*/
val isWindows = SystemUtils.IS_OS_WINDOWS
@@ -1932,13 +1858,6 @@ private[spark] object Utils extends Logging {
}
/**
- * Strip the directory from a path name
- */
- def stripDirectory(path: String): String = {
- new File(path).getName
- }
-
- /**
* Terminates a process waiting for at most the specified duration.
*
* @return the process exit value if it was successfully terminated, else None
@@ -2349,36 +2268,6 @@ private[spark] object Utils extends Logging {
}
/**
- * config a log4j properties used for testsuite
- */
- def configTestLog4j(level: String): Unit = {
- val pro = new Properties()
- pro.put("log4j.rootLogger", s"$level, console")
- pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
- pro.put("log4j.appender.console.target", "System.err")
- pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
- pro.put("log4j.appender.console.layout.ConversionPattern",
- "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
- PropertyConfigurator.configure(pro)
- }
-
- def invoke(
- clazz: Class[_],
- obj: AnyRef,
- methodName: String,
- args: (Class[_], AnyRef)*): AnyRef = {
- val (types, values) = args.unzip
- val method = clazz.getDeclaredMethod(methodName, types: _*)
- method.setAccessible(true)
- method.invoke(obj, values.toSeq: _*)
- }
-
- // Limit of bytes for total size of results (default is 1GB)
- def getMaxResultSize(conf: SparkConf): Long = {
- memoryStringToMb(conf.get("spark.driver.maxResultSize", "1g")).toLong << 20
- }
-
- /**
* Return the current system LD_LIBRARY_PATH name
*/
def libraryPathEnvName: String = {
@@ -2611,16 +2500,6 @@ private[spark] object Utils extends Logging {
}
/**
- * Unions two comma-separated lists of files and filters out empty strings.
- */
- def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = {
- var allFiles = Set.empty[String]
- leftList.foreach { value => allFiles ++= value.split(",") }
- rightList.foreach { value => allFiles ++= value.split(",") }
- allFiles.filter { _.nonEmpty }
- }
-
- /**
* Return the jar files pointed by the "spark.jars" property. Spark internally will distribute
* these jars through file server. In the YARN mode, it will return an empty list, since YARN
* has its own mechanism to distribute jars.
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 24a55df..0d5c5ea 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -95,7 +95,7 @@ public class UnsafeShuffleWriterSuite {
@SuppressWarnings("unchecked")
public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
- tempDir = Utils.createTempDir("test", "test");
+ tempDir = Utils.createTempDir(null, "test");
mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir);
partitionSizesInMergedFile = null;
spillFilesCreated.clear();
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/test/scala/org/apache/spark/DriverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 962945e..896cd2e 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -51,7 +51,7 @@ class DriverSuite extends SparkFunSuite with TimeLimits {
*/
object DriverWithoutCleanup {
def main(args: Array[String]) {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val conf = new SparkConf
val sc = new SparkContext(args(0), "DriverWithoutCleanup", conf)
sc.parallelize(1 to 100, 4).count()
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 803a38d..d265643 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.SpanSugar._
import org.apache.spark._
+import org.apache.spark.TestUtils
import org.apache.spark.TestUtils.JavaSourceFromString
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmit._
@@ -761,18 +762,6 @@ class SparkSubmitSuite
}
}
- test("comma separated list of files are unioned correctly") {
- val left = Option("/tmp/a.jar,/tmp/b.jar")
- val right = Option("/tmp/c.jar,/tmp/a.jar")
- val emptyString = Option("")
- Utils.unionFileLists(left, right) should be (Set("/tmp/a.jar", "/tmp/b.jar", "/tmp/c.jar"))
- Utils.unionFileLists(emptyString, emptyString) should be (Set.empty)
- Utils.unionFileLists(Option("/tmp/a.jar"), emptyString) should be (Set("/tmp/a.jar"))
- Utils.unionFileLists(emptyString, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar"))
- Utils.unionFileLists(None, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar"))
- Utils.unionFileLists(Option("/tmp/a.jar"), None) should be (Set("/tmp/a.jar"))
- }
-
test("support glob path") {
val tmpJarDir = Utils.createTempDir()
val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"), tmpJarDir)
@@ -1042,6 +1031,7 @@ class SparkSubmitSuite
assert(exception.getMessage() === "hello")
}
+
}
object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
@@ -1076,7 +1066,7 @@ object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
object JarCreationTest extends Logging {
def main(args: Array[String]) {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val conf = new SparkConf()
val sc = new SparkContext(conf)
val result = sc.makeRDD(1 to 100, 10).mapPartitions { x =>
@@ -1100,7 +1090,7 @@ object JarCreationTest extends Logging {
object SimpleApplicationTest {
def main(args: Array[String]) {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val conf = new SparkConf()
val sc = new SparkContext(conf)
val configs = Seq("spark.master", "spark.app.name")
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 73e7b3f..e24d550 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -47,7 +47,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
}
test("Simple replay") {
- val logFilePath = Utils.getFilePath(testDir, "events.txt")
+ val logFilePath = getFilePath(testDir, "events.txt")
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
@@ -97,7 +97,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
// scalastyle:on println
}
- val logFilePath = Utils.getFilePath(testDir, "events.lz4.inprogress")
+ val logFilePath = getFilePath(testDir, "events.lz4.inprogress")
val bytes = buffered.toByteArray
Utils.tryWithResource(fileSystem.create(logFilePath)) { fstream =>
fstream.write(bytes, 0, buffered.size)
@@ -129,7 +129,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
}
test("Replay incompatible event log") {
- val logFilePath = Utils.getFilePath(testDir, "incompatible.txt")
+ val logFilePath = getFilePath(testDir, "incompatible.txt")
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Incompatible App", None,
@@ -226,6 +226,12 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
}
}
+ private def getFilePath(dir: File, fileName: String): Path = {
+ assert(dir.isDirectory)
+ val path = new File(dir, fileName).getAbsolutePath
+ new Path(path)
+ }
+
/**
* A simple listener that buffers all the events it receives.
*
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index eaea6b0..3b42731 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -648,6 +648,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
test("fetch hcfs dir") {
val tempDir = Utils.createTempDir()
val sourceDir = new File(tempDir, "source-dir")
+ sourceDir.mkdir()
val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
val targetDir = new File(tempDir, "target-dir")
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index e2fa575..e65e3aa 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -229,7 +229,7 @@ This file is divided into 3 sections:
<check customId="extractopt" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
<parameters><parameter name="regex">extractOpt</parameter></parameters>
- <customMessage>Use Utils.jsonOption(x).map(.extract[T]) instead of .extractOpt[T], as the latter
+ <customMessage>Use jsonOption(x).map(.extract[T]) instead of .extractOpt[T], as the latter
is slower. </customMessage>
</check>
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 523f7cf..8a3bbd0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -39,8 +39,8 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
protected override def beforeAll(): Unit = {
super.beforeAll()
- orcTableAsDir = Utils.createTempDir("orctests", "sparksql")
- orcTableDir = Utils.createTempDir("orctests", "sparksql")
+ orcTableAsDir = Utils.createTempDir(namePrefix = "orctests")
+ orcTableDir = Utils.createTempDir(namePrefix = "orctests")
sparkContext
.makeRDD(1 to 10)
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 122d287..534d8bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -21,13 +21,13 @@ import java.io.File
import scala.collection.mutable.HashMap
+import org.apache.spark.TestUtils
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore}
import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.util.Utils
trait SQLMetricsTestUtils extends SQLTestUtils {
@@ -91,7 +91,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
(0 until 100).map(i => (i, i + 1)).toDF("i", "j").repartition(2)
.write.format(dataFormat).mode("overwrite").insertInto(tableName)
}
- assert(Utils.recursiveList(tableLocation).count(_.getName.startsWith("part-")) == 2)
+ assert(TestUtils.recursiveList(tableLocation).count(_.getName.startsWith("part-")) == 2)
}
}
@@ -121,7 +121,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
.mode("overwrite")
.insertInto(tableName)
}
- assert(Utils.recursiveList(dir).count(_.getName.startsWith("part-")) == 40)
+ assert(TestUtils.recursiveList(dir).count(_.getName.startsWith("part-")) == 40)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index 0fe33e8..27c983f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -22,6 +22,7 @@ import java.sql.Timestamp
import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.TestUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
@@ -86,15 +87,15 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
withTempDir { f =>
spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
.write.option("maxRecordsPerFile", 1).mode("overwrite").parquet(f.getAbsolutePath)
- assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
+ assert(TestUtils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
.write.option("maxRecordsPerFile", 2).mode("overwrite").parquet(f.getAbsolutePath)
- assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2)
+ assert(TestUtils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2)
spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
.write.option("maxRecordsPerFile", -1).mode("overwrite").parquet(f.getAbsolutePath)
- assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1)
+ assert(TestUtils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1)
}
}
@@ -106,7 +107,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
.option("maxRecordsPerFile", 1)
.mode("overwrite")
.parquet(f.getAbsolutePath)
- assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
+ assert(TestUtils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
}
}
@@ -133,14 +134,14 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
val df = Seq((1, ts)).toDF("i", "ts")
withTempPath { f =>
df.write.partitionBy("ts").parquet(f.getAbsolutePath)
- val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+ val files = TestUtils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
assert(files.length == 1)
checkPartitionValues(files.head, "2016-12-01 00:00:00")
}
withTempPath { f =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT")
.partitionBy("ts").parquet(f.getAbsolutePath)
- val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+ val files = TestUtils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
assert(files.length == 1)
// use timeZone option "GMT" to format partition value.
checkPartitionValues(files.head, "2016-12-01 08:00:00")
@@ -148,7 +149,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
withTempPath { f =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") {
df.write.partitionBy("ts").parquet(f.getAbsolutePath)
- val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+ val files = TestUtils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
assert(files.length == 1)
// if there isn't timeZone option, then use session local timezone.
checkPartitionValues(files.head, "2016-12-01 08:00:00")
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 496f8c8..b32c547 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -804,7 +804,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
protected var metastorePath: File = _
protected def metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
- private val pidDir: File = Utils.createTempDir("thriftserver-pid")
+ private val pidDir: File = Utils.createTempDir(namePrefix = "thriftserver-pid")
protected var logPath: File = _
protected var operationLogPath: File = _
private var logTailingProcess: Process = _
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 2d31781..079fe45 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -330,7 +330,7 @@ class HiveSparkSubmitSuite
object SetMetastoreURLTest extends Logging {
def main(args: Array[String]): Unit = {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val sparkConf = new SparkConf(loadDefaults = true)
val builder = SparkSession.builder()
@@ -368,7 +368,7 @@ object SetMetastoreURLTest extends Logging {
object SetWarehouseLocationTest extends Logging {
def main(args: Array[String]): Unit = {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val sparkConf = new SparkConf(loadDefaults = true).set("spark.ui.enabled", "false")
val providedExpectedWarehouseLocation =
@@ -447,7 +447,7 @@ object SetWarehouseLocationTest extends Logging {
// can load the jar defined with the function.
object TemporaryHiveUDFTest extends Logging {
def main(args: Array[String]) {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val conf = new SparkConf()
conf.set("spark.ui.enabled", "false")
val sc = new SparkContext(conf)
@@ -485,7 +485,7 @@ object TemporaryHiveUDFTest extends Logging {
// can load the jar defined with the function.
object PermanentHiveUDFTest1 extends Logging {
def main(args: Array[String]) {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val conf = new SparkConf()
conf.set("spark.ui.enabled", "false")
val sc = new SparkContext(conf)
@@ -523,7 +523,7 @@ object PermanentHiveUDFTest1 extends Logging {
// can load the jar defined with the function.
object PermanentHiveUDFTest2 extends Logging {
def main(args: Array[String]) {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val conf = new SparkConf()
conf.set("spark.ui.enabled", "false")
val sc = new SparkContext(conf)
@@ -558,7 +558,7 @@ object PermanentHiveUDFTest2 extends Logging {
// We test if we can load user jars in both driver and executors when HiveContext is used.
object SparkSubmitClassLoaderTest extends Logging {
def main(args: Array[String]) {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val conf = new SparkConf()
val hiveWarehouseLocation = Utils.createTempDir()
conf.set("spark.ui.enabled", "false")
@@ -628,7 +628,7 @@ object SparkSubmitClassLoaderTest extends Logging {
// We test if we can correctly set spark sql configurations when HiveContext is used.
object SparkSQLConfTest extends Logging {
def main(args: Array[String]) {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
// We override the SparkConf to add spark.sql.hive.metastore.version and
// spark.sql.hive.metastore.jars to the beginning of the conf entry array.
// So, if metadataHive get initialized after we set spark.sql.hive.metastore.version but
@@ -669,7 +669,7 @@ object SPARK_9757 extends QueryTest {
protected var spark: SparkSession = _
def main(args: Array[String]): Unit = {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val hiveWarehouseLocation = Utils.createTempDir()
val sparkContext = new SparkContext(
@@ -718,7 +718,7 @@ object SPARK_11009 extends QueryTest {
protected var spark: SparkSession = _
def main(args: Array[String]): Unit = {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val sparkContext = new SparkContext(
new SparkConf()
@@ -749,7 +749,7 @@ object SPARK_14244 extends QueryTest {
protected var spark: SparkSession = _
def main(args: Array[String]): Unit = {
- Utils.configTestLog4j("INFO")
+ TestUtils.configTestLog4j("INFO")
val sparkContext = new SparkContext(
new SparkConf()
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index ee2fd45..19b621f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -97,7 +97,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
val batchDurationMillis = batchDuration.milliseconds
// Setup the stream computation
- val checkpointDir = Utils.createTempDir(this.getClass.getSimpleName()).toString
+ val checkpointDir = Utils.createTempDir(namePrefix = this.getClass.getSimpleName()).toString
logDebug(s"Using checkpoint directory $checkpointDir")
val ssc = createContextForCheckpointOperation(batchDuration)
require(ssc.conf.get("spark.streaming.clock") === classOf[ManualClock].getName,
http://git-wip-us.apache.org/repos/asf/spark/blob/c99fc9ad/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
index 3b662ec..06c0c2a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
@@ -39,7 +39,7 @@ class MapWithStateSuite extends SparkFunSuite
before {
StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
- checkpointDir = Utils.createTempDir("checkpoint")
+ checkpointDir = Utils.createTempDir(namePrefix = "checkpoint")
}
after {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org