You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/02/01 15:46:37 UTC

[spark] branch branch-3.4 updated: [3.4][SPARK-42229][CORE] Migrate `SparkCoreErrors` into error classes

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new c4f750b1e22 [3.4][SPARK-42229][CORE] Migrate `SparkCoreErrors` into error classes
c4f750b1e22 is described below

commit c4f750b1e221d34c8aaf78d8ead8383c0bb8aeaa
Author: itholic <ha...@databricks.com>
AuthorDate: Wed Feb 1 18:46:02 2023 +0300

    [3.4][SPARK-42229][CORE] Migrate `SparkCoreErrors` into error classes
    
    ### What changes were proposed in this pull request?
    
    backport for https://github.com/apache/spark/pull/39791
    
    ### Why are the changes needed?
    
    We should want to include PRs related to error classes into Spark 3.4.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    The CI should pass.
    
    Closes #39832 from itholic/42229-3.4.
    
    Authored-by: itholic <ha...@databricks.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   | 222 ++++++++++++++++++
 .../org/apache/spark/errors/SparkCoreErrors.scala  | 260 +++++++++++++++------
 2 files changed, 415 insertions(+), 67 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index f0bbc26aae1..1cad00ad417 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -5288,5 +5288,227 @@
     "message" : [
       "Operation not allowed: <cmd> only works on table with location provided: <tableIdentWithDB>"
     ]
+  },
+  "_LEGACY_ERROR_TEMP_3000" : {
+    "message" : [
+      "Unexpected Py4J server <class>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3001" : {
+    "message" : [
+      "EOFException occurred while reading the port number from <daemonModule>'s stdout<additionalMessage>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3002" : {
+    "message" : [
+      "Data of type <other> is not supported"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3003" : {
+    "message" : [
+      "Could not compute split, block <blockId> of RDD <id> not found"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3004" : {
+    "message" : [
+      "Attempted to use <string> after its blocks have been removed!"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3005" : {
+    "message" : [
+      "Histogram on either an empty RDD or RDD containing +/-infinity or NaN"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3006" : {
+    "message" : [
+      "empty RDD"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3007" : {
+    "message" : [
+      "Checkpoint block <rddBlockId> not found! Either the executor",
+      "that originally checkpointed this partition is no longer alive, or the original RDD is",
+      "unpersisted. If this problem persists, you may consider using `rdd.checkpoint()`",
+      "instead, which is slower than local checkpointing but more fault-tolerant."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3008" : {
+    "message" : [
+      "Cannot use map-side combining with array keys."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3009" : {
+    "message" : [
+      "HashPartitioner cannot partition array keys."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3010" : {
+    "message" : [
+      "reduceByKeyLocally() does not support array keys"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3011" : {
+    "message" : [
+      "This RDD lacks a SparkContext. It could happen in the following cases:",
+      "(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.",
+      "(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3012" : {
+    "message" : [
+      "Cannot change storage level of an RDD after it was already assigned a level"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3013" : {
+    "message" : [
+      "Can only zip RDDs with same number of elements in each partition"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3014" : {
+    "message" : [
+      "empty collection"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3015" : {
+    "message" : [
+      "countByValueApprox() does not support arrays"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3016" : {
+    "message" : [
+      "Checkpoint directory has not been set in the SparkContext"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3017" : {
+    "message" : [
+      "Invalid checkpoint file: <path>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3018" : {
+    "message" : [
+      "Failed to create checkpoint path <checkpointDirPath>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3019" : {
+    "message" : [
+      "Checkpoint RDD has a different number of partitions from original RDD. Original",
+      "RDD [ID: <originalRDDId>, num of partitions: <originalRDDLength>];",
+      "Checkpoint RDD [ID: <newRDDId>, num of partitions: <newRDDLength>]."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3020" : {
+    "message" : [
+      "Checkpoint dir must be specified."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3021" : {
+    "message" : [
+      "Error asking standalone scheduler to shut down executors"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3022" : {
+    "message" : [
+      "Error stopping standalone scheduler's driver endpoint"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3023" : {
+    "message" : [
+      "Can't run submitMapStage on RDD with 0 partitions"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3024" : {
+    "message" : [
+      "attempted to access non-existent accumulator <id>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3025" : {
+    "message" : [
+      "TaskSetManagers should only send Resubmitted task statuses for tasks in ShuffleMapStages."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3026" : {
+    "message" : [
+      "duration() called on unfinished task"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3027" : {
+    "message" : [
+      "Unrecognized <schedulerModeProperty>: <schedulingModeConf>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3028" : {
+    "message" : [
+      "<errorMsg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3029" : {
+    "message" : [
+      "Exiting due to error from cluster scheduler: <message>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3030" : {
+    "message" : [
+      "Task <currentTaskAttemptId> has not locked block <blockId> for writing"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3031" : {
+    "message" : [
+      "Block <blockId> does not exist"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3032" : {
+    "message" : [
+      "Error occurred while waiting for replication to finish"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3033" : {
+    "message" : [
+      "Unable to register with external shuffle server due to : <message>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3034" : {
+    "message" : [
+      "Error occurred while waiting for async. reregistration"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3035" : {
+    "message" : [
+      "Unexpected shuffle block <blockId> with unsupported shuffle resolver <shuffleBlockResolver>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3036" : {
+    "message" : [
+      "Failure while trying to store block <blockId> on <blockManagerId>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3037" : {
+    "message" : [
+      "Block <blockId> was not found even though it's read-locked"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3038" : {
+    "message" : [
+      "get() failed for block <blockId> even though we held a lock"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3039" : {
+    "message" : [
+      "BlockManager returned null for BlockStatus query: <blockId>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3040" : {
+    "message" : [
+      "BlockManagerMasterEndpoint returned false, expected true."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3041" : {
+    "message" : [
+      ""
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3042" : {
+    "message" : [
+      "Failed to get block <blockId>, which is not a shuffle block"
+    ]
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
index 308ee003d5c..6c393e18bd8 100644
--- a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
+++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkException, TaskNotSerializableException}
+import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException, TaskNotSerializableException}
 import org.apache.spark.memory.SparkOutOfMemoryError
 import org.apache.spark.scheduler.{BarrierJobRunWithDynamicAllocationException, BarrierJobSlotsNumberCheckFailed, BarrierJobUnsupportedRDDChainException}
 import org.apache.spark.shuffle.{FetchFailedException, ShuffleManager}
@@ -35,36 +35,59 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockNotFoundException
  */
 private[spark] object SparkCoreErrors {
   def unexpectedPy4JServerError(other: Object): Throwable = {
-    new RuntimeException(s"Unexpected Py4J server ${other.getClass}")
+    new SparkRuntimeException(
+      errorClass = "_LEGACY_ERROR_TEMP_3000",
+      messageParameters = Map("class" -> s"${other.getClass}")
+    )
   }
 
   def eofExceptionWhileReadPortNumberError(
       daemonModule: String,
       daemonExitValue: Option[Int] = null): Throwable = {
-    val msg = s"EOFException occurred while reading the port number from $daemonModule's" +
-      s" stdout" + daemonExitValue.map(v => s" and terminated with code: $v.").getOrElse("")
-    new SparkException(msg)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3001",
+      messageParameters = Map(
+        "daemonModule" -> daemonModule,
+        "additionalMessage" ->
+          daemonExitValue.map(v => s" and terminated with code: $v.").getOrElse("")
+      ), cause = null
+    )
   }
 
   def unsupportedDataTypeError(other: Any): Throwable = {
-    new SparkException(s"Data of type $other is not supported")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3002",
+      messageParameters = Map("other" -> s"$other"),
+      cause = null
+    )
   }
 
   def rddBlockNotFoundError(blockId: BlockId, id: Int): Throwable = {
-    new Exception(s"Could not compute split, block $blockId of RDD $id not found")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3003",
+      messageParameters = Map("blockId" -> s"$blockId", "id" -> s"$id"),
+      cause = null
+    )
   }
 
   def blockHaveBeenRemovedError(string: String): Throwable = {
-    new SparkException(s"Attempted to use $string after its blocks have been removed!")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3004",
+      messageParameters = Map("string" -> string),
+      cause = null
+    )
   }
 
   def histogramOnEmptyRDDOrContainingInfinityOrNaNError(): Throwable = {
-    new UnsupportedOperationException(
-      "Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_3005", messageParameters = Map.empty
+    )
   }
 
   def emptyRDDError(): Throwable = {
-    new UnsupportedOperationException("empty RDD")
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_3006", messageParameters = Map.empty
+    )
   }
 
   def pathNotSupportedError(path: String): Throwable = {
@@ -74,12 +97,10 @@ private[spark] object SparkCoreErrors {
 
   def checkpointRDDBlockIdNotFoundError(rddBlockId: RDDBlockId): Throwable = {
     new SparkException(
-      s"""
-         |Checkpoint block $rddBlockId not found! Either the executor
-         |that originally checkpointed this partition is no longer alive, or the original RDD is
-         |unpersisted. If this problem persists, you may consider using `rdd.checkpoint()`
-         |instead, which is slower than local checkpointing but more fault-tolerant.
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_3007",
+      messageParameters = Map("rddBlockId" -> s"$rddBlockId"),
+      cause = null
+    )
   }
 
   def endOfStreamError(): Throwable = {
@@ -87,55 +108,73 @@ private[spark] object SparkCoreErrors {
   }
 
   def cannotUseMapSideCombiningWithArrayKeyError(): Throwable = {
-    new SparkException("Cannot use map-side combining with array keys.")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3008", messageParameters = Map.empty, cause = null
+    )
   }
 
   def hashPartitionerCannotPartitionArrayKeyError(): Throwable = {
-    new SparkException("HashPartitioner cannot partition array keys.")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3009", messageParameters = Map.empty, cause = null
+    )
   }
 
   def reduceByKeyLocallyNotSupportArrayKeysError(): Throwable = {
-    new SparkException("reduceByKeyLocally() does not support array keys")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3010", messageParameters = Map.empty, cause = null
+    )
   }
 
   def rddLacksSparkContextError(): Throwable = {
-    new SparkException("This RDD lacks a SparkContext. It could happen in the following cases: " +
-      "\n(1) RDD transformations and actions are NOT invoked by the driver, but inside of other " +
-      "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " +
-      "because the values transformation and count action cannot be performed inside of the " +
-      "rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " +
-      "Streaming job recovers from checkpoint, this exception will be hit if a reference to " +
-      "an RDD not defined by the streaming job is used in DStream operations. For more " +
-      "information, See SPARK-13758.")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3011", messageParameters = Map.empty, cause = null
+    )
   }
 
   def cannotChangeStorageLevelError(): Throwable = {
-    new UnsupportedOperationException(
-      "Cannot change storage level of an RDD after it was already assigned a level")
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_3012", messageParameters = Map.empty
+    )
   }
 
   def canOnlyZipRDDsWithSamePartitionSizeError(): Throwable = {
-    new SparkException("Can only zip RDDs with same number of elements in each partition")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3013", messageParameters = Map.empty, cause = null
+    )
   }
 
   def emptyCollectionError(): Throwable = {
-    new UnsupportedOperationException("empty collection")
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_3014", messageParameters = Map.empty
+    )
   }
 
   def countByValueApproxNotSupportArraysError(): Throwable = {
-    new SparkException("countByValueApprox() does not support arrays")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3015", messageParameters = Map.empty, cause = null
+    )
   }
 
   def checkpointDirectoryHasNotBeenSetInSparkContextError(): Throwable = {
-    new SparkException("Checkpoint directory has not been set in the SparkContext")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3016", messageParameters = Map.empty, cause = null
+    )
   }
 
   def invalidCheckpointFileError(path: Path): Throwable = {
-    new SparkException(s"Invalid checkpoint file: $path")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3017",
+      messageParameters = Map("path" -> s"$path"),
+      cause = null
+    )
   }
 
   def failToCreateCheckpointPathError(checkpointDirPath: Path): Throwable = {
-    new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3018",
+      messageParameters = Map("checkpointDirPath" -> s"$checkpointDirPath"),
+      cause = null
+    )
   }
 
   def checkpointRDDHasDifferentNumberOfPartitionsFromOriginalRDDError(
@@ -144,11 +183,15 @@ private[spark] object SparkCoreErrors {
       newRDDId: Int,
       newRDDLength: Int): Throwable = {
     new SparkException(
-      s"""
-         |Checkpoint RDD has a different number of partitions from original RDD. Original
-         |RDD [ID: $originalRDDId, num of partitions: $originalRDDLength];
-         |Checkpoint RDD [ID: $newRDDId, num of partitions: $newRDDLength].
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_3019",
+      messageParameters = Map(
+        "originalRDDId" -> s"$originalRDDId",
+        "originalRDDLength" -> s"$originalRDDLength",
+        "newRDDId" -> s"$newRDDId",
+        "newRDDLength" -> s"$newRDDLength"
+      ),
+      cause = null
+    )
   }
 
   def checkpointFailedToSaveError(task: Int, path: Path): Throwable = {
@@ -157,15 +200,21 @@ private[spark] object SparkCoreErrors {
   }
 
   def mustSpecifyCheckpointDirError(): Throwable = {
-    new SparkException("Checkpoint dir must be specified.")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3020", messageParameters = Map.empty, cause = null
+    )
   }
 
   def askStandaloneSchedulerToShutDownExecutorsError(e: Exception): Throwable = {
-    new SparkException("Error asking standalone scheduler to shut down executors", e)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3021", messageParameters = Map.empty, cause = e
+    )
   }
 
   def stopStandaloneSchedulerDriverEndpointError(e: Exception): Throwable = {
-    new SparkException("Error stopping standalone scheduler's driver endpoint", e)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3022", messageParameters = Map.empty, cause = e
+    )
   }
 
   def noExecutorIdleError(id: String): Throwable = {
@@ -187,16 +236,21 @@ private[spark] object SparkCoreErrors {
   }
 
   def cannotRunSubmitMapStageOnZeroPartitionRDDError(): Throwable = {
-    new SparkException("Can't run submitMapStage on RDD with 0 partitions")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3023", messageParameters = Map.empty, cause = null
+    )
   }
 
   def accessNonExistentAccumulatorError(id: Long): Throwable = {
-    new SparkException(s"attempted to access non-existent accumulator $id")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3024", messageParameters = Map("id" -> s"$id"), cause = null
+    )
   }
 
   def sendResubmittedTaskStatusForShuffleMapStagesOnlyError(): Throwable = {
-    new SparkException("TaskSetManagers should only send Resubmitted task " +
-      "statuses for tasks in ShuffleMapStages.")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3025", messageParameters = Map.empty, cause = null
+    )
   }
 
   def nonEmptyEventQueueAfterTimeoutError(timeoutMillis: Long): Throwable = {
@@ -204,21 +258,38 @@ private[spark] object SparkCoreErrors {
   }
 
   def durationCalledOnUnfinishedTaskError(): Throwable = {
-    new UnsupportedOperationException("duration() called on unfinished task")
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_3026", messageParameters = Map.empty
+    )
   }
 
   def unrecognizedSchedulerModePropertyError(
       schedulerModeProperty: String,
       schedulingModeConf: String): Throwable = {
-    new SparkException(s"Unrecognized $schedulerModeProperty: $schedulingModeConf")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3027",
+      messageParameters = Map(
+        "schedulerModeProperty" -> schedulerModeProperty,
+        "schedulingModeConf" -> schedulingModeConf
+      ),
+      cause = null
+    )
   }
 
   def sparkError(errorMsg: String): Throwable = {
-    new SparkException(errorMsg)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3028",
+      messageParameters = Map("errorMsg" -> errorMsg),
+      cause = null
+    )
   }
 
   def clusterSchedulerError(message: String): Throwable = {
-    new SparkException(s"Exiting due to error from cluster scheduler: $message")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3029",
+      messageParameters = Map("message" -> message),
+      cause = null
+    )
   }
 
   def failToSerializeTaskError(e: Throwable): Throwable = {
@@ -230,11 +301,22 @@ private[spark] object SparkCoreErrors {
   }
 
   def taskHasNotLockedBlockError(currentTaskAttemptId: Long, blockId: BlockId): Throwable = {
-    new SparkException(s"Task $currentTaskAttemptId has not locked block $blockId for writing")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3030",
+      messageParameters = Map(
+        "currentTaskAttemptId" -> s"$currentTaskAttemptId",
+        "blockId" -> s"$blockId"
+      ),
+      cause = null
+    )
   }
 
   def blockDoesNotExistError(blockId: BlockId): Throwable = {
-    new SparkException(s"Block $blockId does not exist")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3031",
+      messageParameters = Map("blockId" -> s"$blockId"),
+      cause = null
+    )
   }
 
   def cannotSaveBlockOnDecommissionedExecutorError(blockId: BlockId): Throwable = {
@@ -242,37 +324,69 @@ private[spark] object SparkCoreErrors {
   }
 
   def waitingForReplicationToFinishError(e: Throwable): Throwable = {
-    new SparkException("Error occurred while waiting for replication to finish", e)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3032", messageParameters = Map.empty, cause = e
+    )
   }
 
   def unableToRegisterWithExternalShuffleServerError(e: Throwable): Throwable = {
-    new SparkException(s"Unable to register with external shuffle server due to : ${e.getMessage}",
-      e)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3033",
+      messageParameters = Map("message" -> e.getMessage),
+      cause = e
+    )
   }
 
   def waitingForAsyncReregistrationError(e: Throwable): Throwable = {
-    new SparkException("Error occurred while waiting for async. reregistration", e)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3034", messageParameters = Map.empty, cause = e
+    )
   }
 
   def unexpectedShuffleBlockWithUnsupportedResolverError(
       shuffleManager: ShuffleManager,
       blockId: BlockId): Throwable = {
-    new SparkException(s"Unexpected shuffle block ${blockId} with unsupported shuffle " +
-      s"resolver ${shuffleManager.shuffleBlockResolver}")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3035",
+      messageParameters = Map(
+        "blockId" -> s"$blockId",
+        "shuffleBlockResolver" -> s"${shuffleManager.shuffleBlockResolver}"
+      ),
+      cause = null
+    )
   }
 
   def failToStoreBlockOnBlockManagerError(
       blockManagerId: BlockManagerId,
       blockId: BlockId): Throwable = {
-    new SparkException(s"Failure while trying to store block $blockId on $blockManagerId.")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3036",
+      messageParameters = Map(
+        "blockId" -> s"$blockId",
+        "blockManagerId" -> s"$blockManagerId"
+      ),
+      cause = null
+    )
   }
 
   def readLockedBlockNotFoundError(blockId: BlockId): Throwable = {
-    new SparkException(s"Block $blockId was not found even though it's read-locked")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3037",
+      messageParameters = Map(
+        "blockId" -> s"$blockId"
+      ),
+      cause = null
+    )
   }
 
   def failToGetBlockWithLockError(blockId: BlockId): Throwable = {
-    new SparkException(s"get() failed for block $blockId even though we held a lock")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3038",
+      messageParameters = Map(
+        "blockId" -> s"$blockId"
+      ),
+      cause = null
+    )
   }
 
   def blockNotFoundError(blockId: BlockId): Throwable = {
@@ -284,11 +398,17 @@ private[spark] object SparkCoreErrors {
   }
 
   def blockStatusQueryReturnedNullError(blockId: BlockId): Throwable = {
-    new SparkException(s"BlockManager returned null for BlockStatus query: $blockId")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3039",
+      messageParameters = Map("blockId" -> s"$blockId"),
+      cause = null
+    )
   }
 
   def unexpectedBlockManagerMasterEndpointResultError(): Throwable = {
-    new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3040", messageParameters = Map.empty, cause = null
+    )
   }
 
   def failToCreateDirectoryError(path: String, maxAttempts: Int): Throwable = {
@@ -297,7 +417,9 @@ private[spark] object SparkCoreErrors {
   }
 
   def unsupportedOperationError(): Throwable = {
-    new UnsupportedOperationException()
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_3041", messageParameters = Map.empty
+    )
   }
 
   def noSuchElementError(): Throwable = {
@@ -316,7 +438,11 @@ private[spark] object SparkCoreErrors {
   }
 
   def failToGetNonShuffleBlockError(blockId: BlockId, e: Throwable): Throwable = {
-    new SparkException(s"Failed to get block $blockId, which is not a shuffle block", e)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_3042",
+      messageParameters = Map("blockId" -> s"$blockId"),
+      cause = e
+    )
   }
 
   def graphiteSinkInvalidProtocolError(invalidProtocol: String): Throwable = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org