You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/01/07 23:35:43 UTC

[spark] branch master updated: [SPARK-26491][CORE][TEST] Use ConfigEntry for hardcoded configs for test categories

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a64152  [SPARK-26491][CORE][TEST] Use ConfigEntry for hardcoded configs for test categories
1a64152 is described below

commit 1a641525e60039cc6b10816e946cb6f44b3e2696
Author: Marco Gaido <ma...@gmail.com>
AuthorDate: Mon Jan 7 15:35:33 2019 -0800

    [SPARK-26491][CORE][TEST] Use ConfigEntry for hardcoded configs for test categories
    
    ## What changes were proposed in this pull request?
    
    The PR makes hardcoded `spark.test` and `spark.testing` configs to use `ConfigEntry` and put them in the config package.
    
    ## How was this patch tested?
    
    existing UTs
    
    Closes #23413 from mgaido91/SPARK-26491.
    
    Authored-by: Marco Gaido <ma...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../apache/spark/ExecutorAllocationManager.scala   |  4 +-
 .../main/scala/org/apache/spark/SparkContext.scala |  3 +-
 .../spark/deploy/history/FsHistoryProvider.scala   |  3 +-
 .../org/apache/spark/deploy/worker/Worker.scala    |  4 +-
 .../spark/executor/ProcfsMetricsGetter.scala       |  2 +-
 .../org/apache/spark/executor/TaskMetrics.scala    |  3 +-
 .../org/apache/spark/internal/config/Tests.scala   | 56 ++++++++++++++++++++++
 .../apache/spark/memory/StaticMemoryManager.scala  |  5 +-
 .../apache/spark/memory/UnifiedMemoryManager.scala |  7 +--
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  3 +-
 .../cluster/StandaloneSchedulerBackend.scala       |  3 +-
 .../org/apache/spark/util/SizeEstimator.scala      |  5 +-
 .../main/scala/org/apache/spark/util/Utils.scala   |  5 +-
 .../scala/org/apache/spark/DistributedSuite.scala  |  5 +-
 .../spark/ExecutorAllocationManagerSuite.scala     |  3 +-
 .../test/scala/org/apache/spark/ShuffleSuite.scala |  5 +-
 .../scala/org/apache/spark/SparkFunSuite.scala     |  3 +-
 .../history/HistoryServerArgumentsSuite.scala      |  6 +--
 .../spark/deploy/history/HistoryServerSuite.scala  |  7 +--
 .../spark/memory/StaticMemoryManagerSuite.scala    |  5 +-
 .../spark/memory/UnifiedMemoryManagerSuite.scala   | 33 ++++++-------
 .../spark/scheduler/BarrierTaskContextSuite.scala  |  7 +--
 .../scheduler/BlacklistIntegrationSuite.scala      | 19 ++++----
 .../shuffle/sort/ShuffleExternalSorterSuite.scala  |  5 +-
 .../storage/BlockManagerReplicationSuite.scala     |  9 ++--
 .../apache/spark/storage/BlockManagerSuite.scala   | 10 ++--
 .../apache/spark/storage/MemoryStoreSuite.scala    |  1 -
 .../org/apache/spark/util/SizeEstimatorSuite.scala |  5 +-
 .../collection/ExternalAppendOnlyMapSuite.scala    |  3 +-
 .../util/collection/ExternalSorterSuite.scala      |  3 +-
 .../integrationtest/KubernetesTestComponents.scala |  3 +-
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala |  3 +-
 .../apache/spark/sql/execution/SQLExecution.scala  |  4 +-
 .../org/apache/spark/sql/BenchmarkQueryTest.scala  |  3 +-
 .../sql/execution/UnsafeRowSerializerSuite.scala   |  3 +-
 35 files changed, 165 insertions(+), 83 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index d966582..0807e65 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -27,6 +27,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.BlockManagerMaster
@@ -157,7 +158,7 @@ private[spark] class ExecutorAllocationManager(
 
   // Polling loop interval (ms)
   private val intervalMillis: Long = if (Utils.isTesting) {
-      conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
+      conf.get(TEST_SCHEDULE_INTERVAL)
     } else {
       100
     }
@@ -899,5 +900,4 @@ private[spark] class ExecutorAllocationManager(
 
 private object ExecutorAllocationManager {
   val NOT_SET = Long.MaxValue
-  val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval"
 }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 89be9de..3a1e1b9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -45,6 +45,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
@@ -470,7 +471,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
     // Convert java options to env vars as a work around
     // since we can't set env vars directly in sbt.
-    for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
+    for { (envKey, propKey) <- Seq(("SPARK_TESTING", IS_TESTING.key))
       value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
       executorEnvs(envKey) = value
     }
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 709a380..3c56484 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -45,6 +45,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
 import org.apache.spark.internal.config.History._
 import org.apache.spark.internal.config.Status._
+import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ReplayListenerBus._
@@ -267,7 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     }
 
     // Disable the background thread during tests.
-    if (!conf.contains("spark.testing")) {
+    if (!conf.contains(IS_TESTING)) {
       // A task that periodically checks for event log updates on disk.
       logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
       pool.scheduleWithFixedDelay(
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index d5ea252..467df26 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -37,6 +37,7 @@ import org.apache.spark.deploy.ExternalShuffleService
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc._
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@@ -103,7 +104,6 @@ private[deploy] class Worker(
   private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
     conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)
 
-  private val testing: Boolean = sys.props.contains("spark.testing")
   private var master: Option[RpcEndpointRef] = None
 
   /**
@@ -127,7 +127,7 @@ private[deploy] class Worker(
   private var connected = false
   private val workerId = generateWorkerId()
   private val sparkHome =
-    if (testing) {
+    if (sys.props.contains(IS_TESTING.key)) {
       assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
       new File(sys.props("spark.test.home"))
     } else {
diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
index af67f41..f354d60 100644
--- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
@@ -43,7 +43,7 @@ private[spark] case class ProcfsMetrics(
 // project.
 private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends Logging {
   private val procfsStatFile = "stat"
-  private val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
+  private val testing = Utils.isTesting
   private val pageSize = computePageSize()
   private var isAvailable: Boolean = isProcfsAvailable
   private val pid = computePid()
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 85b2745..ea79c73 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.storage.{BlockId, BlockStatus}
 import org.apache.spark.util._
@@ -202,7 +203,7 @@ class TaskMetrics private[spark] () extends Serializable {
   }
 
   // Only used for test
-  private[spark] val testAccum = sys.props.get("spark.testing").map(_ => new LongAccumulator)
+  private[spark] val testAccum = sys.props.get(IS_TESTING.key).map(_ => new LongAccumulator)
 
 
   import InternalAccumulator._
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
new file mode 100644
index 0000000..21660ab
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+private[spark] object Tests {
+
+  val TEST_USE_COMPRESSED_OOPS_KEY = "spark.test.useCompressedOops"
+
+  val TEST_MEMORY = ConfigBuilder("spark.testing.memory")
+    .longConf
+    .createWithDefault(Runtime.getRuntime.maxMemory)
+
+  val TEST_SCHEDULE_INTERVAL =
+    ConfigBuilder("spark.testing.dynamicAllocation.scheduleInterval")
+      .longConf
+      .createWithDefault(100)
+
+  val IS_TESTING = ConfigBuilder("spark.testing")
+    .booleanConf
+    .createOptional
+
+  val TEST_NO_STAGE_RETRY = ConfigBuilder("spark.test.noStageRetry")
+    .booleanConf
+    .createWithDefault(false)
+
+  val TEST_RESERVED_MEMORY = ConfigBuilder("spark.testing.reservedMemory")
+    .longConf
+    .createOptional
+
+  val TEST_N_HOSTS = ConfigBuilder("spark.testing.nHosts")
+    .intConf
+    .createWithDefault(5)
+
+  val TEST_N_EXECUTORS_HOST = ConfigBuilder("spark.testing.nExecutorsPerHost")
+    .intConf
+    .createWithDefault(4)
+
+  val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor")
+    .intConf
+    .createWithDefault(2)
+}
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 0fd349d..7e052c0 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
 import org.apache.spark.storage.BlockId
 
 /**
@@ -120,7 +121,7 @@ private[spark] object StaticMemoryManager {
    * Return the total amount of memory available for the storage region, in bytes.
    */
   private def getMaxStorageMemory(conf: SparkConf): Long = {
-    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+    val systemMaxMemory = conf.get(TEST_MEMORY)
     val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
     val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
     (systemMaxMemory * memoryFraction * safetyFraction).toLong
@@ -130,7 +131,7 @@ private[spark] object StaticMemoryManager {
    * Return the total amount of memory available for the execution region, in bytes.
    */
   private def getMaxExecutionMemory(conf: SparkConf): Long = {
-    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+    val systemMaxMemory = conf.get(TEST_MEMORY)
 
     if (systemMaxMemory < MIN_MEMORY_BYTES) {
       throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 9260fd3..7801bb8 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests._
 import org.apache.spark.storage.BlockId
 
 /**
@@ -210,9 +211,9 @@ object UnifiedMemoryManager {
    * Return the total amount of memory shared between execution and storage, in bytes.
    */
   private def getMaxMemory(conf: SparkConf): Long = {
-    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
-    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
-      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
+    val systemMemory = conf.get(TEST_MEMORY)
+    val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
+      if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
     val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
     if (systemMemory < minSystemMemory) {
       throw new IllegalArgumentException(s"System memory $systemMemory must " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 6f4c326..f6ade18 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,6 +38,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
@@ -186,7 +187,7 @@ private[spark] class DAGScheduler(
   private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
 
   /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
-  private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
+  private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY)
 
   /**
    * Whether to unregister all the outputs on the host in condition that we receive a FetchFailure,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index adef20d..66080b6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
 import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler._
@@ -90,7 +91,7 @@ private[spark] class StandaloneSchedulerBackend(
     // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
     // when the assembly is built with the "*-provided" profiles enabled.
     val testingClassPath =
-      if (sys.props.contains("spark.testing")) {
+      if (sys.props.contains(IS_TESTING.key)) {
         sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
       } else {
         Nil
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 3bfdf95..e12b6b7 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -28,6 +28,7 @@ import com.google.common.collect.MapMaker
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS_KEY
 import org.apache.spark.util.collection.OpenHashSet
 
 /**
@@ -126,8 +127,8 @@ object SizeEstimator extends Logging {
   private def getIsCompressedOops: Boolean = {
     // This is only used by tests to override the detection of compressed oops. The test
     // actually uses a system property instead of a SparkConf, so we'll stick with that.
-    if (System.getProperty("spark.test.useCompressedOops") != null) {
-      return System.getProperty("spark.test.useCompressedOops").toBoolean
+    if (System.getProperty(TEST_USE_COMPRESSED_OOPS_KEY) != null) {
+      return System.getProperty(TEST_USE_COMPRESSED_OOPS_KEY).toBoolean
     }
 
     // java.vm.info provides compressed ref info for IBM JDKs
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3527fee..16ef381 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -60,6 +60,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
@@ -1847,7 +1848,7 @@ private[spark] object Utils extends Logging {
    * Indicates whether Spark is currently running unit tests.
    */
   def isTesting: Boolean = {
-    sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
+    sys.env.contains("SPARK_TESTING") || sys.props.contains(IS_TESTING.key)
   }
 
   /**
@@ -2175,7 +2176,7 @@ private[spark] object Utils extends Logging {
    */
   def portMaxRetries(conf: SparkConf): Int = {
     val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
-    if (conf.contains("spark.testing")) {
+    if (conf.contains(IS_TESTING)) {
       // Set a higher number of retries for tests...
       maxRetries.getOrElse(100)
     } else {
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 4083b20..21050e4 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.{Millis, Span}
 
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests._
 import org.apache.spark.security.EncryptionFunSuite
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.io.ChunkedByteBuffer
@@ -217,7 +218,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
     val size = 10000
     val conf = new SparkConf()
       .set("spark.storage.unrollMemoryThreshold", "1024")
-      .set("spark.testing.memory", (size / 2).toString)
+      .set(TEST_MEMORY, size.toLong / 2)
     sc = new SparkContext(clusterUrl, "test", conf)
     val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY)
     assert(data.count() === size)
@@ -233,7 +234,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
     val numPartitions = 20
     val conf = new SparkConf()
       .set("spark.storage.unrollMemoryThreshold", "1024")
-      .set("spark.testing.memory", size.toString)
+      .set(TEST_MEMORY, size.toLong)
     sc = new SparkContext(clusterUrl, "test", conf)
     val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY)
     assert(data.count() === size)
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 38f5e8c..6b310b9 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ExternalClusterManager
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -1166,7 +1167,7 @@ class ExecutorAllocationManagerSuite
       .set("spark.dynamicAllocation.testing", "true")
       // SPARK-22864: effectively disable the allocation schedule by setting the period to a
       // really long value.
-      .set(TESTING_SCHEDULE_INTERVAL_KEY, "10000")
+      .set(TEST_SCHEDULE_INTERVAL, 10000L)
     val sc = new SparkContext(conf)
     contexts += sc
     sc
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 35f728c..ffa7042 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService
 import org.scalatest.Matchers
 
 import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
+import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
 import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
@@ -37,7 +38,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
 
   // Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately
   // test that the shuffle works (rather than retrying until all blocks are local to one Executor).
-  conf.set("spark.test.noStageRetry", "true")
+  conf.set(TEST_NO_STAGE_RETRY, true)
 
   test("groupByKey without compression") {
     val myConf = conf.clone().set("spark.shuffle.compress", "false")
@@ -269,7 +270,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
   }
 
   test("[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file") {
-    val myConf = conf.clone().set("spark.test.noStageRetry", "false")
+    val myConf = conf.clone().set(TEST_NO_STAGE_RETRY, false)
     sc = new SparkContext("local", "test", myConf)
     val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
     rdd.count()
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index dad24d7..7d114b1 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -23,6 +23,7 @@ import java.io.File
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.util.{AccumulatorContext, Utils}
 
 /**
@@ -59,7 +60,7 @@ abstract class SparkFunSuite
   protected val enableAutoThreadAudit = true
 
   protected override def beforeAll(): Unit = {
-    System.setProperty("spark.testing", "true")
+    System.setProperty(IS_TESTING.key, "true")
     if (enableAutoThreadAudit) {
       doThreadPreAudit()
     }
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
index 6b47987..5903ae7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
 
 import org.apache.spark._
 import org.apache.spark.internal.config.History._
-import org.apache.spark.util.Utils
+import org.apache.spark.internal.config.Tests._
 
 class HistoryServerArgumentsSuite extends SparkFunSuite {
 
@@ -31,14 +31,14 @@ class HistoryServerArgumentsSuite extends SparkFunSuite {
   private val conf = new SparkConf()
     .set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
     .set(UPDATE_INTERVAL_S, 1L)
-    .set("spark.testing", "true")
+    .set(IS_TESTING, true)
 
   test("No Arguments Parsing") {
     val argStrings = Array.empty[String]
     val hsa = new HistoryServerArguments(conf, argStrings)
     assert(conf.get(HISTORY_LOG_DIR) === logDir.getAbsolutePath)
     assert(conf.get(UPDATE_INTERVAL_S) === 1L)
-    assert(conf.get("spark.testing") === "true")
+    assert(conf.get(IS_TESTING).getOrElse(false))
   }
 
   test("Properties File Arguments Parsing --properties-file") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 96458c5..bb7d3c5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -47,6 +47,7 @@ import org.scalatest.selenium.WebBrowser
 import org.apache.spark._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.History._
+import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.status.api.v1.ApplicationInfo
 import org.apache.spark.status.api.v1.JobData
 import org.apache.spark.ui.SparkUI
@@ -81,7 +82,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     val conf = new SparkConf()
       .set(HISTORY_LOG_DIR, logDir)
       .set(UPDATE_INTERVAL_S.key, "0")
-      .set("spark.testing", "true")
+      .set(IS_TESTING, true)
       .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
       .set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
       .set(EVENT_LOG_PROCESS_TREE_METRICS, true)
@@ -400,7 +401,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
    */
   test("security manager starts with spark.authenticate set") {
     val conf = new SparkConf()
-      .set("spark.testing", "true")
+      .set(IS_TESTING, true)
       .set(SecurityManager.SPARK_AUTH_CONF, "true")
     HistoryServer.createSecurityManager(conf)
   }
@@ -422,7 +423,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
       .set(UPDATE_INTERVAL_S.key, "1s")
       .set(EVENT_LOG_ENABLED, true)
       .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
-      .remove("spark.testing")
+      .remove(IS_TESTING)
     val provider = new FsHistoryProvider(myConf)
     val securityManager = HistoryServer.createSecurityManager(myConf)
 
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 0f32fe4..c3275ad 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -21,6 +21,7 @@ import org.mockito.Mockito.when
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
 import org.apache.spark.storage.TestBlockId
 import org.apache.spark.storage.memory.MemoryStore
 
@@ -48,8 +49,8 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
     new StaticMemoryManager(
       conf.clone
         .set("spark.memory.fraction", "1")
-        .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-        .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString),
+        .set(TEST_MEMORY, maxOnHeapExecutionMemory)
+        .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory),
       maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
       maxOnHeapStorageMemory = 0,
       numCores = 1)
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 5ce3453..8556e92 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
 import org.apache.spark.storage.TestBlockId
 import org.apache.spark.storage.memory.MemoryStore
 
@@ -43,8 +44,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
       maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
     val conf = new SparkConf()
       .set("spark.memory.fraction", "1")
-      .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-      .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString)
+      .set(TEST_MEMORY, maxOnHeapExecutionMemory)
+      .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory)
       .set("spark.memory.storageFraction", storageFraction.toString)
     UnifiedMemoryManager(conf, numCores = 1)
   }
@@ -218,19 +219,19 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
   }
 
   test("small heap") {
-    val systemMemory = 1024 * 1024
-    val reservedMemory = 300 * 1024
+    val systemMemory = 1024L * 1024
+    val reservedMemory = 300L * 1024
     val memoryFraction = 0.8
     val conf = new SparkConf()
       .set("spark.memory.fraction", memoryFraction.toString)
-      .set("spark.testing.memory", systemMemory.toString)
-      .set("spark.testing.reservedMemory", reservedMemory.toString)
+      .set(TEST_MEMORY, systemMemory)
+      .set(TEST_RESERVED_MEMORY, reservedMemory)
     val mm = UnifiedMemoryManager(conf, numCores = 1)
     val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong
     assert(mm.maxHeapMemory === expectedMaxMemory)
 
     // Try using a system memory that's too small
-    val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 2).toString)
+    val conf2 = conf.clone().set(TEST_MEMORY, reservedMemory / 2)
     val exception = intercept[IllegalArgumentException] {
       UnifiedMemoryManager(conf2, numCores = 1)
     }
@@ -238,13 +239,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
   }
 
   test("insufficient executor memory") {
-    val systemMemory = 1024 * 1024
-    val reservedMemory = 300 * 1024
+    val systemMemory = 1024L * 1024
+    val reservedMemory = 300L * 1024
     val memoryFraction = 0.8
     val conf = new SparkConf()
       .set("spark.memory.fraction", memoryFraction.toString)
-      .set("spark.testing.memory", systemMemory.toString)
-      .set("spark.testing.reservedMemory", reservedMemory.toString)
+      .set(TEST_MEMORY, systemMemory)
+      .set(TEST_RESERVED_MEMORY, reservedMemory)
     val mm = UnifiedMemoryManager(conf, numCores = 1)
 
     // Try using an executor memory that's too small
@@ -259,7 +260,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     val conf = new SparkConf()
       .set("spark.memory.fraction", "1")
       .set("spark.memory.storageFraction", "0")
-      .set("spark.testing.memory", "1000")
+      .set(TEST_MEMORY, 1000L)
     val mm = UnifiedMemoryManager(conf, numCores = 2)
     val ms = makeMemoryStore(mm)
     val memoryMode = MemoryMode.ON_HEAP
@@ -285,7 +286,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     val conf = new SparkConf()
       .set("spark.memory.fraction", "1")
       .set("spark.memory.storageFraction", "0")
-      .set("spark.testing.memory", "1000")
+      .set(TEST_MEMORY, 1000L)
     val mm = UnifiedMemoryManager(conf, numCores = 2)
     makeBadMemoryStore(mm)
     val memoryMode = MemoryMode.ON_HEAP
@@ -306,9 +307,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
 
   test("not enough free memory in the storage pool --OFF_HEAP") {
     val conf = new SparkConf()
-      .set(MEMORY_OFFHEAP_SIZE.key, "1000")
-      .set("spark.testing.memory", "1000")
-      .set(MEMORY_OFFHEAP_ENABLED.key, "true")
+      .set(MEMORY_OFFHEAP_SIZE, 1000L)
+      .set(TEST_MEMORY, 1000L)
+      .set(MEMORY_OFFHEAP_ENABLED, true)
     val taskAttemptId = 0L
     val mm = UnifiedMemoryManager(conf, numCores = 1)
     val ms = makeMemoryStore(mm)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index 36dd620..112fd31 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
 import scala.util.Random
 
 import org.apache.spark._
+import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 
 class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -76,7 +77,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
   test("throw exception on barrier() call timeout") {
     val conf = new SparkConf()
       .set("spark.barrier.sync.timeout", "1")
-      .set("spark.test.noStageRetry", "true")
+      .set(TEST_NO_STAGE_RETRY, true)
       .setMaster("local-cluster[4, 1, 1024]")
       .setAppName("test-cluster")
     sc = new SparkContext(conf)
@@ -101,7 +102,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
   test("throw exception if barrier() call doesn't happen on every task") {
     val conf = new SparkConf()
       .set("spark.barrier.sync.timeout", "1")
-      .set("spark.test.noStageRetry", "true")
+      .set(TEST_NO_STAGE_RETRY, true)
       .setMaster("local-cluster[4, 1, 1024]")
       .setAppName("test-cluster")
     sc = new SparkContext(conf)
@@ -124,7 +125,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
   test("throw exception if the number of barrier() calls are not the same on every task") {
     val conf = new SparkConf()
       .set("spark.barrier.sync.timeout", "1")
-      .set("spark.test.noStageRetry", "true")
+      .set(TEST_NO_STAGE_RETRY, true)
       .setMaster("local-cluster[4, 1, 1024]")
       .setAppName("test-cluster")
     sc = new SparkContext(conf)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 29bb823..2215f7f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -20,6 +20,7 @@ import scala.concurrent.duration._
 
 import org.apache.spark._
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests._
 
 class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{
 
@@ -58,9 +59,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
     extraConfs = Seq(
       config.BLACKLIST_ENABLED.key -> "true",
       config.MAX_TASK_FAILURES.key -> "4",
-      "spark.testing.nHosts" -> "2",
-      "spark.testing.nExecutorsPerHost" -> "5",
-      "spark.testing.nCoresPerExecutor" -> "10"
+      TEST_N_HOSTS.key -> "2",
+      TEST_N_EXECUTORS_HOST.key -> "5",
+      TEST_N_CORES_EXECUTOR.key -> "10"
     )
   ) {
     // To reliably reproduce the failure that would occur without blacklisting, we have to use 1
@@ -102,9 +103,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
     "SPARK-15865 Progress with fewer executors than maxTaskFailures",
     extraConfs = Seq(
       config.BLACKLIST_ENABLED.key -> "true",
-      "spark.testing.nHosts" -> "2",
-      "spark.testing.nExecutorsPerHost" -> "1",
-      "spark.testing.nCoresPerExecutor" -> "1",
+      TEST_N_HOSTS.key -> "2",
+      TEST_N_EXECUTORS_HOST.key -> "1",
+      TEST_N_CORES_EXECUTOR.key -> "1",
       "spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s"
     )
   ) {
@@ -129,9 +130,9 @@ class MultiExecutorMockBackend(
     conf: SparkConf,
     taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) {
 
-  val nHosts = conf.getInt("spark.testing.nHosts", 5)
-  val nExecutorsPerHost = conf.getInt("spark.testing.nExecutorsPerHost", 4)
-  val nCoresPerExecutor = conf.getInt("spark.testing.nCoresPerExecutor", 2)
+  val nHosts = conf.get(TEST_N_HOSTS)
+  val nExecutorsPerHost = conf.get(TEST_N_EXECUTORS_HOST)
+  val nCoresPerExecutor = conf.get(TEST_N_CORES_EXECUTOR)
 
   override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = {
     (0 until nHosts).flatMap { hostIdx =>
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
index b9f0e87..43621cb 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.internal.config.Tests._
 import org.apache.spark.memory._
 import org.apache.spark.unsafe.Platform
 
@@ -33,8 +34,8 @@ class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext wi
     val conf = new SparkConf()
       .setMaster("local[1]")
       .setAppName("ShuffleExternalSorterSuite")
-      .set("spark.testing", "true")
-      .set("spark.testing.memory", "1600")
+      .set(IS_TESTING, true)
+      .set(TEST_MEMORY, 1600L)
       .set("spark.memory.fraction", "1")
     sc = new SparkContext(conf)
 
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 19116cf..480e07f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{DRIVER_PORT, MEMORY_OFFHEAP_SIZE}
+import org.apache.spark.internal.config.Tests._
 import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.netty.NettyBlockTransferService
@@ -69,8 +70,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
   protected def makeBlockManager(
       maxMem: Long,
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
-    conf.set("spark.testing.memory", maxMem.toString)
-    conf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
+    conf.set(TEST_MEMORY, maxMem)
+    conf.set(MEMORY_OFFHEAP_SIZE, maxMem)
     val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
     val memManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, conf)
@@ -87,7 +88,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
 
     conf.set("spark.authenticate", "false")
     conf.set(DRIVER_PORT, rpcEnv.address.port)
-    conf.set("spark.testing", "true")
+    conf.set(IS_TESTING, true)
     conf.set("spark.memory.fraction", "1")
     conf.set("spark.memory.storageFraction", "1")
     conf.set("spark.storage.unrollFraction", "0.4")
@@ -233,7 +234,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work
     when(failableTransfer.hostName).thenReturn("some-hostname")
     when(failableTransfer.port).thenReturn(1000)
-    conf.set("spark.testing.memory", "10000")
+    conf.set(TEST_MEMORY, 10000L)
     val memManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, conf)
     val failableStore = new BlockManager("failable-store", rpcEnv, master, serializerManager, conf,
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a7bb2a0..bda8136 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -37,6 +37,7 @@ import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
 import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext}
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -89,8 +90,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       transferService: Option[BlockTransferService] = Option.empty,
       testConf: Option[SparkConf] = None): BlockManager = {
     val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf)
-    bmConf.set("spark.testing.memory", maxMem.toString)
-    bmConf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
+    bmConf.set(TEST_MEMORY, maxMem)
+    bmConf.set(MEMORY_OFFHEAP_SIZE, maxMem)
     val serializer = new KryoSerializer(bmConf)
     val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) {
       Some(CryptoStreamUtils.createKey(bmConf))
@@ -115,11 +116,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     System.setProperty("os.arch", "amd64")
     conf = new SparkConf(false)
       .set("spark.app.id", "test")
-      .set("spark.testing", "true")
+      .set(IS_TESTING, true)
       .set("spark.memory.fraction", "1")
       .set("spark.memory.storageFraction", "1")
       .set("spark.kryoserializer.buffer", "1m")
-      .set("spark.test.useCompressedOops", "true")
       .set("spark.storage.unrollFraction", "0.4")
       .set("spark.storage.unrollMemoryThreshold", "512")
 
@@ -901,7 +901,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("block store put failure") {
     // Use Java serializer so we can create an unserializable error.
-    conf.set("spark.testing.memory", "1200")
+    conf.set(TEST_MEMORY, 1200L)
     val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
     val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index baff672..b02af2b 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -39,7 +39,6 @@ class MemoryStoreSuite
   with ResetSystemProperties {
 
   var conf: SparkConf = new SparkConf(false)
-    .set("spark.test.useCompressedOops", "true")
     .set("spark.storage.unrollFraction", "0.4")
     .set("spark.storage.unrollMemoryThreshold", "512")
 
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 63f9f82..8bc62db 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS_KEY
 
 class DummyClass1 {}
 
@@ -76,7 +77,7 @@ class SizeEstimatorSuite
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
     super.beforeEach()
     System.setProperty("os.arch", "amd64")
-    System.setProperty("spark.test.useCompressedOops", "true")
+    System.setProperty(TEST_USE_COMPRESSED_OOPS_KEY, "true")
   }
 
   override def afterEach(): Unit = {
@@ -192,7 +193,7 @@ class SizeEstimatorSuite
   // (Sun vs IBM). Use a DummyString class to make tests deterministic.
   test("64-bit arch with no compressed oops") {
     System.setProperty("os.arch", "amd64")
-    System.setProperty("spark.test.useCompressedOops", "false")
+    System.setProperty(TEST_USE_COMPRESSED_OOPS_KEY, "false")
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
 
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 35fba1a..6211399 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually
 
 import org.apache.spark._
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.util.CompletionIterator
@@ -552,7 +553,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
     val conf = createSparkConf(loadDefaults = false)
       .set("spark.shuffle.memoryFraction", "0.01")
       .set("spark.memory.useLegacyMode", "true")
-      .set("spark.testing.memory", "100000000")
+      .set(TEST_MEMORY, 100000000L)
       .set("spark.shuffle.sort.bypassMergeThreshold", "0")
     sc = new SparkContext("local", "test", conf)
     val N = 2e5.toInt
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 47173b8..aa400dd 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
 
 import org.apache.spark._
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
 import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.unsafe.array.LongArray
@@ -639,7 +640,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
     val conf = createSparkConf(loadDefaults = false, kryo = false)
       .set("spark.shuffle.memoryFraction", "0.01")
       .set("spark.memory.useLegacyMode", "true")
-      .set("spark.testing.memory", "100000000")
+      .set(TEST_MEMORY, 100000000L)
       .set("spark.shuffle.sort.bypassMergeThreshold", "0")
     sc = new SparkContext("local", "test", conf)
     val N = 2e5.toInt
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index c0b435e..cc89683 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests.IS_TESTING
 
 private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {
 
@@ -67,7 +68,7 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
       .set("spark.executors.instances", "1")
       .set("spark.app.name", "spark-test-app")
       .set("spark.ui.enabled", "true")
-      .set("spark.testing", "false")
+      .set(IS_TESTING, false)
       .set("spark.kubernetes.submission.waitAppCompletion", "false")
       .set("spark.kubernetes.authenticate.driver.serviceAccountName", serviceAccountName)
   }
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 03cd258..fb23535 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -33,6 +33,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkExceptio
 import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -298,7 +299,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   protected def driverURL: String = {
-    if (conf.contains("spark.testing")) {
+    if (conf.contains(IS_TESTING)) {
       "driverURL"
     } else {
       RpcEndpointAddress(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index dda7cb5..5b38fe5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
 
-import org.apache.spark.SparkContext
+import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
 
@@ -38,7 +38,7 @@ object SQLExecution {
     executionIdToQueryExecution.get(executionId)
   }
 
-  private val testing = sys.props.contains("spark.testing")
+  private val testing = sys.props.contains(IS_TESTING.key)
 
   private[sql] def checkSQLExecutionId(sparkSession: SparkSession): Unit = {
     val sc = sparkSession.sparkContext
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
index d95794d..c37d663 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodeGenerator}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec}
@@ -29,7 +30,7 @@ abstract class BenchmarkQueryTest extends QueryTest with SharedSQLContext with B
 
   // When Utils.isTesting is true, the RuleExecutor will issue an exception when hitting
   // the max iteration of analyzer/optimizer batches.
-  assert(Utils.isTesting, "spark.testing is not set to true")
+  assert(Utils.isTesting, s"${IS_TESTING.key} is not set to true")
 
   /**
    * Drop all the tables
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index ca86922..963e425 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
 import java.util.Properties
 
 import org.apache.spark._
+import org.apache.spark.internal.config.Tests.TEST_MEMORY
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
@@ -99,7 +100,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
     val conf = new SparkConf()
       .set("spark.shuffle.spill.initialMemoryThreshold", "1")
       .set("spark.shuffle.sort.bypassMergeThreshold", "0")
-      .set("spark.testing.memory", "80000")
+      .set(TEST_MEMORY, 80000L)
     spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate()
     val outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
     outputFile.deleteOnExit()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org