You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/01/07 23:35:43 UTC
[spark] branch master updated: [SPARK-26491][CORE][TEST] Use
ConfigEntry for hardcoded configs for test categories
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1a64152 [SPARK-26491][CORE][TEST] Use ConfigEntry for hardcoded configs for test categories
1a64152 is described below
commit 1a641525e60039cc6b10816e946cb6f44b3e2696
Author: Marco Gaido <ma...@gmail.com>
AuthorDate: Mon Jan 7 15:35:33 2019 -0800
[SPARK-26491][CORE][TEST] Use ConfigEntry for hardcoded configs for test categories
## What changes were proposed in this pull request?
The PR makes hardcoded `spark.test` and `spark.testing` configs to use `ConfigEntry` and put them in the config package.
## How was this patch tested?
existing UTs
Closes #23413 from mgaido91/SPARK-26491.
Authored-by: Marco Gaido <ma...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../apache/spark/ExecutorAllocationManager.scala | 4 +-
.../main/scala/org/apache/spark/SparkContext.scala | 3 +-
.../spark/deploy/history/FsHistoryProvider.scala | 3 +-
.../org/apache/spark/deploy/worker/Worker.scala | 4 +-
.../spark/executor/ProcfsMetricsGetter.scala | 2 +-
.../org/apache/spark/executor/TaskMetrics.scala | 3 +-
.../org/apache/spark/internal/config/Tests.scala | 56 ++++++++++++++++++++++
.../apache/spark/memory/StaticMemoryManager.scala | 5 +-
.../apache/spark/memory/UnifiedMemoryManager.scala | 7 +--
.../org/apache/spark/scheduler/DAGScheduler.scala | 3 +-
.../cluster/StandaloneSchedulerBackend.scala | 3 +-
.../org/apache/spark/util/SizeEstimator.scala | 5 +-
.../main/scala/org/apache/spark/util/Utils.scala | 5 +-
.../scala/org/apache/spark/DistributedSuite.scala | 5 +-
.../spark/ExecutorAllocationManagerSuite.scala | 3 +-
.../test/scala/org/apache/spark/ShuffleSuite.scala | 5 +-
.../scala/org/apache/spark/SparkFunSuite.scala | 3 +-
.../history/HistoryServerArgumentsSuite.scala | 6 +--
.../spark/deploy/history/HistoryServerSuite.scala | 7 +--
.../spark/memory/StaticMemoryManagerSuite.scala | 5 +-
.../spark/memory/UnifiedMemoryManagerSuite.scala | 33 ++++++-------
.../spark/scheduler/BarrierTaskContextSuite.scala | 7 +--
.../scheduler/BlacklistIntegrationSuite.scala | 19 ++++----
.../shuffle/sort/ShuffleExternalSorterSuite.scala | 5 +-
.../storage/BlockManagerReplicationSuite.scala | 9 ++--
.../apache/spark/storage/BlockManagerSuite.scala | 10 ++--
.../apache/spark/storage/MemoryStoreSuite.scala | 1 -
.../org/apache/spark/util/SizeEstimatorSuite.scala | 5 +-
.../collection/ExternalAppendOnlyMapSuite.scala | 3 +-
.../util/collection/ExternalSorterSuite.scala | 3 +-
.../integrationtest/KubernetesTestComponents.scala | 3 +-
.../mesos/MesosCoarseGrainedSchedulerBackend.scala | 3 +-
.../apache/spark/sql/execution/SQLExecution.scala | 4 +-
.../org/apache/spark/sql/BenchmarkQueryTest.scala | 3 +-
.../sql/execution/UnsafeRowSerializerSuite.scala | 3 +-
35 files changed, 165 insertions(+), 83 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index d966582..0807e65 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -27,6 +27,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMaster
@@ -157,7 +158,7 @@ private[spark] class ExecutorAllocationManager(
// Polling loop interval (ms)
private val intervalMillis: Long = if (Utils.isTesting) {
- conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
+ conf.get(TEST_SCHEDULE_INTERVAL)
} else {
100
}
@@ -899,5 +900,4 @@ private[spark] class ExecutorAllocationManager(
private object ExecutorAllocationManager {
val NOT_SET = Long.MaxValue
- val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval"
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 89be9de..3a1e1b9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -45,6 +45,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
@@ -470,7 +471,7 @@ class SparkContext(config: SparkConf) extends Logging {
// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
- for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
+ for { (envKey, propKey) <- Seq(("SPARK_TESTING", IS_TESTING.key))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 709a380..3c56484 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -45,6 +45,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
@@ -267,7 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
// Disable the background thread during tests.
- if (!conf.contains("spark.testing")) {
+ if (!conf.contains(IS_TESTING)) {
// A task that periodically checks for event log updates on disk.
logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
pool.scheduleWithFixedDelay(
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index d5ea252..467df26 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -37,6 +37,7 @@ import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@@ -103,7 +104,6 @@ private[deploy] class Worker(
private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)
- private val testing: Boolean = sys.props.contains("spark.testing")
private var master: Option[RpcEndpointRef] = None
/**
@@ -127,7 +127,7 @@ private[deploy] class Worker(
private var connected = false
private val workerId = generateWorkerId()
private val sparkHome =
- if (testing) {
+ if (sys.props.contains(IS_TESTING.key)) {
assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
new File(sys.props("spark.test.home"))
} else {
diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
index af67f41..f354d60 100644
--- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
@@ -43,7 +43,7 @@ private[spark] case class ProcfsMetrics(
// project.
private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends Logging {
private val procfsStatFile = "stat"
- private val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
+ private val testing = Utils.isTesting
private val pageSize = computePageSize()
private var isAvailable: Boolean = isProcfsAvailable
private val pid = computePid()
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 85b2745..ea79c73 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util._
@@ -202,7 +203,7 @@ class TaskMetrics private[spark] () extends Serializable {
}
// Only used for test
- private[spark] val testAccum = sys.props.get("spark.testing").map(_ => new LongAccumulator)
+ private[spark] val testAccum = sys.props.get(IS_TESTING.key).map(_ => new LongAccumulator)
import InternalAccumulator._
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
new file mode 100644
index 0000000..21660ab
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+private[spark] object Tests {
+
+ val TEST_USE_COMPRESSED_OOPS_KEY = "spark.test.useCompressedOops"
+
+ val TEST_MEMORY = ConfigBuilder("spark.testing.memory")
+ .longConf
+ .createWithDefault(Runtime.getRuntime.maxMemory)
+
+ val TEST_SCHEDULE_INTERVAL =
+ ConfigBuilder("spark.testing.dynamicAllocation.scheduleInterval")
+ .longConf
+ .createWithDefault(100)
+
+ val IS_TESTING = ConfigBuilder("spark.testing")
+ .booleanConf
+ .createOptional
+
+ val TEST_NO_STAGE_RETRY = ConfigBuilder("spark.test.noStageRetry")
+ .booleanConf
+ .createWithDefault(false)
+
+ val TEST_RESERVED_MEMORY = ConfigBuilder("spark.testing.reservedMemory")
+ .longConf
+ .createOptional
+
+ val TEST_N_HOSTS = ConfigBuilder("spark.testing.nHosts")
+ .intConf
+ .createWithDefault(5)
+
+ val TEST_N_EXECUTORS_HOST = ConfigBuilder("spark.testing.nExecutorsPerHost")
+ .intConf
+ .createWithDefault(4)
+
+ val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor")
+ .intConf
+ .createWithDefault(2)
+}
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 0fd349d..7e052c0 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.storage.BlockId
/**
@@ -120,7 +121,7 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the storage region, in bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
- val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+ val systemMaxMemory = conf.get(TEST_MEMORY)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
@@ -130,7 +131,7 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the execution region, in bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
- val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+ val systemMaxMemory = conf.get(TEST_MEMORY)
if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 9260fd3..7801bb8 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.storage.BlockId
/**
@@ -210,9 +211,9 @@ object UnifiedMemoryManager {
* Return the total amount of memory shared between execution and storage, in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
- val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
- val reservedMemory = conf.getLong("spark.testing.reservedMemory",
- if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
+ val systemMemory = conf.get(TEST_MEMORY)
+ val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
+ if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 6f4c326..f6ade18 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,6 +38,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
@@ -186,7 +187,7 @@ private[spark] class DAGScheduler(
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
- private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
+ private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY)
/**
* Whether to unregister all the outputs on the host in condition that we receive a FetchFailure,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index adef20d..66080b6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._
@@ -90,7 +91,7 @@ private[spark] class StandaloneSchedulerBackend(
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
- if (sys.props.contains("spark.testing")) {
+ if (sys.props.contains(IS_TESTING.key)) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 3bfdf95..e12b6b7 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -28,6 +28,7 @@ import com.google.common.collect.MapMaker
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS_KEY
import org.apache.spark.util.collection.OpenHashSet
/**
@@ -126,8 +127,8 @@ object SizeEstimator extends Logging {
private def getIsCompressedOops: Boolean = {
// This is only used by tests to override the detection of compressed oops. The test
// actually uses a system property instead of a SparkConf, so we'll stick with that.
- if (System.getProperty("spark.test.useCompressedOops") != null) {
- return System.getProperty("spark.test.useCompressedOops").toBoolean
+ if (System.getProperty(TEST_USE_COMPRESSED_OOPS_KEY) != null) {
+ return System.getProperty(TEST_USE_COMPRESSED_OOPS_KEY).toBoolean
}
// java.vm.info provides compressed ref info for IBM JDKs
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3527fee..16ef381 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -60,6 +60,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
@@ -1847,7 +1848,7 @@ private[spark] object Utils extends Logging {
* Indicates whether Spark is currently running unit tests.
*/
def isTesting: Boolean = {
- sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
+ sys.env.contains("SPARK_TESTING") || sys.props.contains(IS_TESTING.key)
}
/**
@@ -2175,7 +2176,7 @@ private[spark] object Utils extends Logging {
*/
def portMaxRetries(conf: SparkConf): Int = {
val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
- if (conf.contains("spark.testing")) {
+ if (conf.contains(IS_TESTING)) {
// Set a higher number of retries for tests...
maxRetries.getOrElse(100)
} else {
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 4083b20..21050e4 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.{Millis, Span}
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.security.EncryptionFunSuite
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -217,7 +218,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val size = 10000
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
- .set("spark.testing.memory", (size / 2).toString)
+ .set(TEST_MEMORY, size.toLong / 2)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
@@ -233,7 +234,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val numPartitions = 20
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
- .set("spark.testing.memory", size.toString)
+ .set(TEST_MEMORY, size.toLong)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 38f5e8c..6b310b9 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ExternalClusterManager
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -1166,7 +1167,7 @@ class ExecutorAllocationManagerSuite
.set("spark.dynamicAllocation.testing", "true")
// SPARK-22864: effectively disable the allocation schedule by setting the period to a
// really long value.
- .set(TESTING_SCHEDULE_INTERVAL_KEY, "10000")
+ .set(TEST_SCHEDULE_INTERVAL, 10000L)
val sc = new SparkContext(conf)
contexts += sc
sc
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 35f728c..ffa7042 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService
import org.scalatest.Matchers
import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
+import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
@@ -37,7 +38,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately
// test that the shuffle works (rather than retrying until all blocks are local to one Executor).
- conf.set("spark.test.noStageRetry", "true")
+ conf.set(TEST_NO_STAGE_RETRY, true)
test("groupByKey without compression") {
val myConf = conf.clone().set("spark.shuffle.compress", "false")
@@ -269,7 +270,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file") {
- val myConf = conf.clone().set("spark.test.noStageRetry", "false")
+ val myConf = conf.clone().set(TEST_NO_STAGE_RETRY, false)
sc = new SparkContext("local", "test", myConf)
val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
rdd.count()
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index dad24d7..7d114b1 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -23,6 +23,7 @@ import java.io.File
import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.util.{AccumulatorContext, Utils}
/**
@@ -59,7 +60,7 @@ abstract class SparkFunSuite
protected val enableAutoThreadAudit = true
protected override def beforeAll(): Unit = {
- System.setProperty("spark.testing", "true")
+ System.setProperty(IS_TESTING.key, "true")
if (enableAutoThreadAudit) {
doThreadPreAudit()
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
index 6b47987..5903ae7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
import org.apache.spark._
import org.apache.spark.internal.config.History._
-import org.apache.spark.util.Utils
+import org.apache.spark.internal.config.Tests._
class HistoryServerArgumentsSuite extends SparkFunSuite {
@@ -31,14 +31,14 @@ class HistoryServerArgumentsSuite extends SparkFunSuite {
private val conf = new SparkConf()
.set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
.set(UPDATE_INTERVAL_S, 1L)
- .set("spark.testing", "true")
+ .set(IS_TESTING, true)
test("No Arguments Parsing") {
val argStrings = Array.empty[String]
val hsa = new HistoryServerArguments(conf, argStrings)
assert(conf.get(HISTORY_LOG_DIR) === logDir.getAbsolutePath)
assert(conf.get(UPDATE_INTERVAL_S) === 1L)
- assert(conf.get("spark.testing") === "true")
+ assert(conf.get(IS_TESTING).getOrElse(false))
}
test("Properties File Arguments Parsing --properties-file") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 96458c5..bb7d3c5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -47,6 +47,7 @@ import org.scalatest.selenium.WebBrowser
import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.status.api.v1.ApplicationInfo
import org.apache.spark.status.api.v1.JobData
import org.apache.spark.ui.SparkUI
@@ -81,7 +82,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
val conf = new SparkConf()
.set(HISTORY_LOG_DIR, logDir)
.set(UPDATE_INTERVAL_S.key, "0")
- .set("spark.testing", "true")
+ .set(IS_TESTING, true)
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
.set(EVENT_LOG_PROCESS_TREE_METRICS, true)
@@ -400,7 +401,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
*/
test("security manager starts with spark.authenticate set") {
val conf = new SparkConf()
- .set("spark.testing", "true")
+ .set(IS_TESTING, true)
.set(SecurityManager.SPARK_AUTH_CONF, "true")
HistoryServer.createSecurityManager(conf)
}
@@ -422,7 +423,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
.set(UPDATE_INTERVAL_S.key, "1s")
.set(EVENT_LOG_ENABLED, true)
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
- .remove("spark.testing")
+ .remove(IS_TESTING)
val provider = new FsHistoryProvider(myConf)
val securityManager = HistoryServer.createSecurityManager(myConf)
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 0f32fe4..c3275ad 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -21,6 +21,7 @@ import org.mockito.Mockito.when
import org.apache.spark.SparkConf
import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.storage.TestBlockId
import org.apache.spark.storage.memory.MemoryStore
@@ -48,8 +49,8 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
new StaticMemoryManager(
conf.clone
.set("spark.memory.fraction", "1")
- .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
- .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString),
+ .set(TEST_MEMORY, maxOnHeapExecutionMemory)
+ .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory),
maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
maxOnHeapStorageMemory = 0,
numCores = 1)
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 5ce3453..8556e92 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.storage.TestBlockId
import org.apache.spark.storage.memory.MemoryStore
@@ -43,8 +44,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
- .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
- .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString)
+ .set(TEST_MEMORY, maxOnHeapExecutionMemory)
+ .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory)
.set("spark.memory.storageFraction", storageFraction.toString)
UnifiedMemoryManager(conf, numCores = 1)
}
@@ -218,19 +219,19 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
}
test("small heap") {
- val systemMemory = 1024 * 1024
- val reservedMemory = 300 * 1024
+ val systemMemory = 1024L * 1024
+ val reservedMemory = 300L * 1024
val memoryFraction = 0.8
val conf = new SparkConf()
.set("spark.memory.fraction", memoryFraction.toString)
- .set("spark.testing.memory", systemMemory.toString)
- .set("spark.testing.reservedMemory", reservedMemory.toString)
+ .set(TEST_MEMORY, systemMemory)
+ .set(TEST_RESERVED_MEMORY, reservedMemory)
val mm = UnifiedMemoryManager(conf, numCores = 1)
val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong
assert(mm.maxHeapMemory === expectedMaxMemory)
// Try using a system memory that's too small
- val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 2).toString)
+ val conf2 = conf.clone().set(TEST_MEMORY, reservedMemory / 2)
val exception = intercept[IllegalArgumentException] {
UnifiedMemoryManager(conf2, numCores = 1)
}
@@ -238,13 +239,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
}
test("insufficient executor memory") {
- val systemMemory = 1024 * 1024
- val reservedMemory = 300 * 1024
+ val systemMemory = 1024L * 1024
+ val reservedMemory = 300L * 1024
val memoryFraction = 0.8
val conf = new SparkConf()
.set("spark.memory.fraction", memoryFraction.toString)
- .set("spark.testing.memory", systemMemory.toString)
- .set("spark.testing.reservedMemory", reservedMemory.toString)
+ .set(TEST_MEMORY, systemMemory)
+ .set(TEST_RESERVED_MEMORY, reservedMemory)
val mm = UnifiedMemoryManager(conf, numCores = 1)
// Try using an executor memory that's too small
@@ -259,7 +260,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "0")
- .set("spark.testing.memory", "1000")
+ .set(TEST_MEMORY, 1000L)
val mm = UnifiedMemoryManager(conf, numCores = 2)
val ms = makeMemoryStore(mm)
val memoryMode = MemoryMode.ON_HEAP
@@ -285,7 +286,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "0")
- .set("spark.testing.memory", "1000")
+ .set(TEST_MEMORY, 1000L)
val mm = UnifiedMemoryManager(conf, numCores = 2)
makeBadMemoryStore(mm)
val memoryMode = MemoryMode.ON_HEAP
@@ -306,9 +307,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
test("not enough free memory in the storage pool --OFF_HEAP") {
val conf = new SparkConf()
- .set(MEMORY_OFFHEAP_SIZE.key, "1000")
- .set("spark.testing.memory", "1000")
- .set(MEMORY_OFFHEAP_ENABLED.key, "true")
+ .set(MEMORY_OFFHEAP_SIZE, 1000L)
+ .set(TEST_MEMORY, 1000L)
+ .set(MEMORY_OFFHEAP_ENABLED, true)
val taskAttemptId = 0L
val mm = UnifiedMemoryManager(conf, numCores = 1)
val ms = makeMemoryStore(mm)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index 36dd620..112fd31 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import scala.util.Random
import org.apache.spark._
+import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
@@ -76,7 +77,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
test("throw exception on barrier() call timeout") {
val conf = new SparkConf()
.set("spark.barrier.sync.timeout", "1")
- .set("spark.test.noStageRetry", "true")
+ .set(TEST_NO_STAGE_RETRY, true)
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
@@ -101,7 +102,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
test("throw exception if barrier() call doesn't happen on every task") {
val conf = new SparkConf()
.set("spark.barrier.sync.timeout", "1")
- .set("spark.test.noStageRetry", "true")
+ .set(TEST_NO_STAGE_RETRY, true)
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
@@ -124,7 +125,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
test("throw exception if the number of barrier() calls are not the same on every task") {
val conf = new SparkConf()
.set("spark.barrier.sync.timeout", "1")
- .set("spark.test.noStageRetry", "true")
+ .set(TEST_NO_STAGE_RETRY, true)
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 29bb823..2215f7f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -20,6 +20,7 @@ import scala.concurrent.duration._
import org.apache.spark._
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests._
class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{
@@ -58,9 +59,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
extraConfs = Seq(
config.BLACKLIST_ENABLED.key -> "true",
config.MAX_TASK_FAILURES.key -> "4",
- "spark.testing.nHosts" -> "2",
- "spark.testing.nExecutorsPerHost" -> "5",
- "spark.testing.nCoresPerExecutor" -> "10"
+ TEST_N_HOSTS.key -> "2",
+ TEST_N_EXECUTORS_HOST.key -> "5",
+ TEST_N_CORES_EXECUTOR.key -> "10"
)
) {
// To reliably reproduce the failure that would occur without blacklisting, we have to use 1
@@ -102,9 +103,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
"SPARK-15865 Progress with fewer executors than maxTaskFailures",
extraConfs = Seq(
config.BLACKLIST_ENABLED.key -> "true",
- "spark.testing.nHosts" -> "2",
- "spark.testing.nExecutorsPerHost" -> "1",
- "spark.testing.nCoresPerExecutor" -> "1",
+ TEST_N_HOSTS.key -> "2",
+ TEST_N_EXECUTORS_HOST.key -> "1",
+ TEST_N_CORES_EXECUTOR.key -> "1",
"spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s"
)
) {
@@ -129,9 +130,9 @@ class MultiExecutorMockBackend(
conf: SparkConf,
taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) {
- val nHosts = conf.getInt("spark.testing.nHosts", 5)
- val nExecutorsPerHost = conf.getInt("spark.testing.nExecutorsPerHost", 4)
- val nCoresPerExecutor = conf.getInt("spark.testing.nCoresPerExecutor", 2)
+ val nHosts = conf.get(TEST_N_HOSTS)
+ val nExecutorsPerHost = conf.get(TEST_N_EXECUTORS_HOST)
+ val nCoresPerExecutor = conf.get(TEST_N_CORES_EXECUTOR)
override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = {
(0 until nHosts).flatMap { hostIdx =>
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
index b9f0e87..43621cb 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.mockito.MockitoSugar
import org.apache.spark._
import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.memory._
import org.apache.spark.unsafe.Platform
@@ -33,8 +34,8 @@ class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext wi
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("ShuffleExternalSorterSuite")
- .set("spark.testing", "true")
- .set("spark.testing.memory", "1600")
+ .set(IS_TESTING, true)
+ .set(TEST_MEMORY, 1600L)
.set("spark.memory.fraction", "1")
sc = new SparkContext(conf)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 19116cf..480e07f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DRIVER_PORT, MEMORY_OFFHEAP_SIZE}
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -69,8 +70,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
protected def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
- conf.set("spark.testing.memory", maxMem.toString)
- conf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
+ conf.set(TEST_MEMORY, maxMem)
+ conf.set(MEMORY_OFFHEAP_SIZE, maxMem)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
@@ -87,7 +88,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
conf.set("spark.authenticate", "false")
conf.set(DRIVER_PORT, rpcEnv.address.port)
- conf.set("spark.testing", "true")
+ conf.set(IS_TESTING, true)
conf.set("spark.memory.fraction", "1")
conf.set("spark.memory.storageFraction", "1")
conf.set("spark.storage.unrollFraction", "0.4")
@@ -233,7 +234,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
- conf.set("spark.testing.memory", "10000")
+ conf.set(TEST_MEMORY, 10000L)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val failableStore = new BlockManager("failable-store", rpcEnv, master, serializerManager, conf,
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a7bb2a0..bda8136 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -37,6 +37,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -89,8 +90,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
transferService: Option[BlockTransferService] = Option.empty,
testConf: Option[SparkConf] = None): BlockManager = {
val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf)
- bmConf.set("spark.testing.memory", maxMem.toString)
- bmConf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
+ bmConf.set(TEST_MEMORY, maxMem)
+ bmConf.set(MEMORY_OFFHEAP_SIZE, maxMem)
val serializer = new KryoSerializer(bmConf)
val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(bmConf))
@@ -115,11 +116,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
System.setProperty("os.arch", "amd64")
conf = new SparkConf(false)
.set("spark.app.id", "test")
- .set("spark.testing", "true")
+ .set(IS_TESTING, true)
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "1")
.set("spark.kryoserializer.buffer", "1m")
- .set("spark.test.useCompressedOops", "true")
.set("spark.storage.unrollFraction", "0.4")
.set("spark.storage.unrollMemoryThreshold", "512")
@@ -901,7 +901,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
- conf.set("spark.testing.memory", "1200")
+ conf.set(TEST_MEMORY, 1200L)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index baff672..b02af2b 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -39,7 +39,6 @@ class MemoryStoreSuite
with ResetSystemProperties {
var conf: SparkConf = new SparkConf(false)
- .set("spark.test.useCompressedOops", "true")
.set("spark.storage.unrollFraction", "0.4")
.set("spark.storage.unrollMemoryThreshold", "512")
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 63f9f82..8bc62db 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS_KEY
class DummyClass1 {}
@@ -76,7 +77,7 @@ class SizeEstimatorSuite
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
super.beforeEach()
System.setProperty("os.arch", "amd64")
- System.setProperty("spark.test.useCompressedOops", "true")
+ System.setProperty(TEST_USE_COMPRESSED_OOPS_KEY, "true")
}
override def afterEach(): Unit = {
@@ -192,7 +193,7 @@ class SizeEstimatorSuite
// (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("64-bit arch with no compressed oops") {
System.setProperty("os.arch", "amd64")
- System.setProperty("spark.test.useCompressedOops", "false")
+ System.setProperty(TEST_USE_COMPRESSED_OOPS_KEY, "false")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 35fba1a..6211399 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark._
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.io.CompressionCodec
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.util.CompletionIterator
@@ -552,7 +553,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
val conf = createSparkConf(loadDefaults = false)
.set("spark.shuffle.memoryFraction", "0.01")
.set("spark.memory.useLegacyMode", "true")
- .set("spark.testing.memory", "100000000")
+ .set(TEST_MEMORY, 100000000L)
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
sc = new SparkContext("local", "test", conf)
val N = 2e5.toInt
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 47173b8..aa400dd 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark._
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.unsafe.array.LongArray
@@ -639,7 +640,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val conf = createSparkConf(loadDefaults = false, kryo = false)
.set("spark.shuffle.memoryFraction", "0.01")
.set("spark.memory.useLegacyMode", "true")
- .set("spark.testing.memory", "100000000")
+ .set(TEST_MEMORY, 100000000L)
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
sc = new SparkContext("local", "test", conf)
val N = 2e5.toInt
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index c0b435e..cc89683 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.IS_TESTING
private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {
@@ -67,7 +68,7 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
.set("spark.executors.instances", "1")
.set("spark.app.name", "spark-test-app")
.set("spark.ui.enabled", "true")
- .set("spark.testing", "false")
+ .set(IS_TESTING, false)
.set("spark.kubernetes.submission.waitAppCompletion", "false")
.set("spark.kubernetes.authenticate.driver.serviceAccountName", serviceAccountName)
}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 03cd258..fb23535 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -33,6 +33,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkExceptio
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -298,7 +299,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
protected def driverURL: String = {
- if (conf.contains("spark.testing")) {
+ if (conf.contains(IS_TESTING)) {
"driverURL"
} else {
RpcEndpointAddress(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index dda7cb5..5b38fe5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
-import org.apache.spark.SparkContext
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
@@ -38,7 +38,7 @@ object SQLExecution {
executionIdToQueryExecution.get(executionId)
}
- private val testing = sys.props.contains("spark.testing")
+ private val testing = sys.props.contains(IS_TESTING.key)
private[sql] def checkSQLExecutionId(sparkSession: SparkSession): Unit = {
val sc = sparkSession.sparkContext
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
index d95794d..c37d663 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodeGenerator}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec}
@@ -29,7 +30,7 @@ abstract class BenchmarkQueryTest extends QueryTest with SharedSQLContext with B
// When Utils.isTesting is true, the RuleExecutor will issue an exception when hitting
// the max iteration of analyzer/optimizer batches.
- assert(Utils.isTesting, "spark.testing is not set to true")
+ assert(Utils.isTesting, s"${IS_TESTING.key} is not set to true")
/**
* Drop all the tables
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index ca86922..963e425 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
import java.util.Properties
import org.apache.spark._
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
@@ -99,7 +100,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
val conf = new SparkConf()
.set("spark.shuffle.spill.initialMemoryThreshold", "1")
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
- .set("spark.testing.memory", "80000")
+ .set(TEST_MEMORY, 80000L)
spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate()
val outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
outputFile.deleteOnExit()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org