You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/28 14:02:25 UTC

[spark] branch master updated: [SPARK-36095][CORE] Grouping exception in core/rdd

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new af6d04b  [SPARK-36095][CORE] Grouping exception in core/rdd
af6d04b is described below

commit af6d04b65ca45c0d8e1aacc93da7be271b586506
Author: dgd-contributor <dg...@viettel.com.vn>
AuthorDate: Wed Jul 28 22:01:26 2021 +0800

    [SPARK-36095][CORE] Grouping exception in core/rdd
    
    ### What changes were proposed in this pull request?
    This PR group exception messages in core/src/main/scala/org/apache/spark/rdd
    
    ### Why are the changes needed?
    It will largely help with standardization of error messages and its maintenance.
    
    ### Does this PR introduce _any_ user-facing change?
    No. Error messages remain unchanged.
    
    ### How was this patch tested?
    No new tests - pass all original tests to make sure it doesn't break any existing behavior.
    
    Closes #33317 from dgd-contributor/SPARK-36095_GroupExceptionCoreRdd.
    
    Lead-authored-by: dgd-contributor <dg...@viettel.com.vn>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/errors/SparkCoreErrors.scala  | 144 +++++++++++++++++++++
 .../main/scala/org/apache/spark/rdd/BlockRDD.scala |   6 +-
 .../org/apache/spark/rdd/DoubleRDDFunctions.scala  |   4 +-
 .../main/scala/org/apache/spark/rdd/EmptyRDD.scala |   3 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala     |   4 +-
 .../org/apache/spark/rdd/LocalCheckpointRDD.scala  |   9 +-
 .../scala/org/apache/spark/rdd/NewHadoopRDD.scala  |   3 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala    |  15 ++-
 .../main/scala/org/apache/spark/rdd/PipedRDD.scala |   3 +-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala |  27 ++--
 .../apache/spark/rdd/ReliableCheckpointRDD.scala   |  17 +--
 .../spark/rdd/ReliableRDDCheckpointData.scala      |   3 +-
 12 files changed, 186 insertions(+), 52 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
new file mode 100644
index 0000000..a29f375
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.errors
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.storage.{BlockId, RDDBlockId}
+
+/**
+ * Object for grouping error messages from (most) exceptions thrown during query execution.
+ */
+object SparkCoreErrors {
+  def rddBlockNotFoundError(blockId: BlockId, id: Int): Throwable = {
+    new Exception(s"Could not compute split, block $blockId of RDD $id not found")
+  }
+
+  def blockHaveBeenRemovedError(string: String): Throwable = {
+    new SparkException(s"Attempted to use $string after its blocks have been removed!")
+  }
+
+  def histogramOnEmptyRDDOrContainingInfinityOrNaNError(): Throwable = {
+    new UnsupportedOperationException(
+      "Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
+  }
+
+  def emptyRDDError(): Throwable = {
+    new UnsupportedOperationException("empty RDD")
+  }
+
+  def pathNotSupportedError(path: String): Throwable = {
+    new IOException(s"Path: ${path} is a directory, which is not supported by the " +
+      "record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.")
+  }
+
+  def checkpointRDDBlockIdNotFoundError(rddBlockId: RDDBlockId): Throwable = {
+    new SparkException(
+      s"""
+         |Checkpoint block $rddBlockId not found! Either the executor
+         |that originally checkpointed this partition is no longer alive, or the original RDD is
+         |unpersisted. If this problem persists, you may consider using `rdd.checkpoint()`
+         |instead, which is slower than local checkpointing but more fault-tolerant.
+       """.stripMargin.replaceAll("\n", " "))
+  }
+
+  def endOfStreamError(): Throwable = {
+    new java.util.NoSuchElementException("End of stream")
+  }
+
+  def cannotUseMapSideCombiningWithArrayKeyError(): Throwable = {
+    new SparkException("Cannot use map-side combining with array keys.")
+  }
+
+  def hashPartitionerCannotPartitionArrayKeyError(): Throwable = {
+    new SparkException("HashPartitioner cannot partition array keys.")
+  }
+
+  def reduceByKeyLocallyNotSupportArrayKeysError(): Throwable = {
+    new SparkException("reduceByKeyLocally() does not support array keys")
+  }
+
+  def noSuchElementException(): Throwable = {
+    new NoSuchElementException()
+  }
+
+  def rddLacksSparkContextError(): Throwable = {
+    new SparkException("This RDD lacks a SparkContext. It could happen in the following cases: " +
+      "\n(1) RDD transformations and actions are NOT invoked by the driver, but inside of other " +
+      "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " +
+      "because the values transformation and count action cannot be performed inside of the " +
+      "rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " +
+      "Streaming job recovers from checkpoint, this exception will be hit if a reference to " +
+      "an RDD not defined by the streaming job is used in DStream operations. For more " +
+      "information, See SPARK-13758.")
+  }
+
+  def cannotChangeStorageLevelError(): Throwable = {
+    new UnsupportedOperationException(
+      "Cannot change storage level of an RDD after it was already assigned a level")
+  }
+
+  def canOnlyZipRDDsWithSamePartitionSizeError(): Throwable = {
+    new SparkException("Can only zip RDDs with same number of elements in each partition")
+  }
+
+  def emptyCollectionError(): Throwable = {
+    new UnsupportedOperationException("empty collection")
+  }
+
+  def countByValueApproxNotSupportArraysError(): Throwable = {
+    new SparkException("countByValueApprox() does not support arrays")
+  }
+
+  def checkpointDirectoryHasNotBeenSetInSparkContextError(): Throwable = {
+    new SparkException("Checkpoint directory has not been set in the SparkContext")
+  }
+
+  def invalidCheckpointFileError(path: Path): Throwable = {
+    new SparkException(s"Invalid checkpoint file: $path")
+  }
+
+  def failToCreateCheckpointPathError(checkpointDirPath: Path): Throwable = {
+    new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
+  }
+
+  def checkpointRDDHasDifferentNumberOfPartitionsFromOriginalRDDError(
+      originalRDDId: Int,
+      originalRDDLength: Int,
+      newRDDId: Int,
+      newRDDLength: Int): Throwable = {
+    new SparkException(
+      s"""
+         |Checkpoint RDD has a different number of partitions from original RDD. Original
+         |RDD [ID: $originalRDDId, num of partitions: $originalRDDLength];
+         |Checkpoint RDD [ID: $newRDDId, num of partitions: $newRDDLength].
+       """.stripMargin.replaceAll("\n", " "))
+  }
+
+  def checkpointFailedToSaveError(task: Int, path: Path): Throwable = {
+    new IOException("Checkpoint failed: failed to save output of task: " +
+      s"$task and final output path does not exist: $path")
+  }
+
+  def mustSpecifyCheckpointDirError(): Throwable = {
+    new SparkException("Checkpoint dir must be specified.")
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index a5c3e2a..05cad3d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import scala.reflect.ClassTag
 
 import org.apache.spark._
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.storage.{BlockId, BlockManager}
 
 private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
@@ -47,7 +48,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo
     blockManager.get[T](blockId) match {
       case Some(block) => block.data.asInstanceOf[Iterator[T]]
       case None =>
-        throw new Exception(s"Could not compute split, block $blockId of RDD $id not found")
+        throw SparkCoreErrors.rddBlockNotFoundError(blockId, id)
     }
   }
 
@@ -79,8 +80,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo
   /** Check if this BlockRDD is valid. If not valid, exception is thrown. */
   private[spark] def assertValid(): Unit = {
     if (!isValid) {
-      throw new SparkException(
-        "Attempted to use %s after its blocks have been removed!".format(toString))
+      throw SparkCoreErrors.blockHaveBeenRemovedError(toString)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 39f6956..9c97e02 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
 
 import org.apache.spark.TaskContext
 import org.apache.spark.annotation.Since
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.partial.BoundedDouble
 import org.apache.spark.partial.MeanEvaluator
@@ -135,8 +136,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
       (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
     }
     if (min.isNaN || max.isNaN || max.isInfinity || min.isInfinity ) {
-      throw new UnsupportedOperationException(
-        "Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
+      throw SparkCoreErrors.histogramOnEmptyRDDOrContainingInfinityOrNaNError()
     }
     val range = if (min != max) {
       // Range.Double.inclusive(min, max, increment)
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
index a2d7e34..8b75b3c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.errors.SparkCoreErrors
 
 /**
  * An RDD that has no partitions and no elements.
@@ -29,6 +30,6 @@ private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc,
   override def getPartitions: Array[Partition] = Array.empty
 
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    throw new UnsupportedOperationException("empty RDD")
+    throw SparkCoreErrors.emptyRDDError()
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 5fc0b4f..7011451 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -36,6 +36,7 @@ import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
@@ -234,8 +235,7 @@ class HadoopRDD[K, V](
         Array.empty[Partition]
       case e: IOException if e.getMessage.startsWith("Not a file:") =>
         val path = e.getMessage.split(":").map(_.trim).apply(2)
-        throw new IOException(s"Path: ${path} is a directory, which is not supported by the " +
-          s"record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.")
+        throw SparkCoreErrors.pathNotSupportedError(path)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
index 113ed2d..342cf66 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
@@ -19,7 +19,8 @@ package org.apache.spark.rdd
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.storage.RDDBlockId
 
 /**
@@ -57,11 +58,7 @@ private[spark] class LocalCheckpointRDD[T: ClassTag](
    * available in the block storage.
    */
   override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
-    throw new SparkException(
-      s"Checkpoint block ${RDDBlockId(rddId, partition.index)} not found! Either the executor " +
-      s"that originally checkpointed this partition is no longer alive, or the original RDD is " +
-      s"unpersisted. If this problem persists, you may consider using `rdd.checkpoint()` " +
-      s"instead, which is slower than local checkpointing but more fault-tolerant.")
+    throw SparkCoreErrors.checkpointRDDBlockIdNotFoundError(RDDBlockId(rddId, partition.index))
   }
 
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index a7a6cf4..7fdbe7b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
@@ -270,7 +271,7 @@ class NewHadoopRDD[K, V](
 
       override def next(): (K, V) = {
         if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
+          throw SparkCoreErrors.endOfStreamError()
         }
         havePair = false
         if (!finished) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index f280c22..4dd2967 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewO
 
 import org.apache.spark._
 import org.apache.spark.Partitioner.defaultPartitioner
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.SPECULATION_ENABLED
 import org.apache.spark.internal.io._
@@ -76,10 +77,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
     if (keyClass.isArray) {
       if (mapSideCombine) {
-        throw new SparkException("Cannot use map-side combining with array keys.")
+        throw SparkCoreErrors.cannotUseMapSideCombiningWithArrayKeyError()
       }
       if (partitioner.isInstanceOf[HashPartitioner]) {
-        throw new SparkException("HashPartitioner cannot partition array keys.")
+        throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
       }
     }
     val aggregator = new Aggregator[K, V, C](
@@ -331,7 +332,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val cleanedF = self.sparkContext.clean(func)
 
     if (keyClass.isArray) {
-      throw new SparkException("reduceByKeyLocally() does not support array keys")
+      throw SparkCoreErrors.reduceByKeyLocallyNotSupportArrayKeysError()
     }
 
     val reducePartition = (iter: Iterator[(K, V)]) => {
@@ -524,7 +525,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    */
   def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
     if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
-      throw new SparkException("HashPartitioner cannot partition array keys.")
+      throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
     }
     if (self.partitioner == Some(partitioner)) {
       self
@@ -776,7 +777,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       partitioner: Partitioner)
       : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
     if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
-      throw new SparkException("HashPartitioner cannot partition array keys.")
+      throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
     cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
@@ -794,7 +795,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
       : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
     if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
-      throw new SparkException("HashPartitioner cannot partition array keys.")
+      throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
     }
     val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
     cg.mapValues { case Array(vs, w1s) =>
@@ -809,7 +810,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
       : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
     if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
-      throw new SparkException("HashPartitioner cannot partition array keys.")
+      throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
     cg.mapValues { case Array(vs, w1s, w2s) =>
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 5dd8cb8..3cf0dd2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -33,6 +33,7 @@ import scala.io.Source
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.util.Utils
 
 
@@ -184,7 +185,7 @@ private[spark] class PipedRDD[T: ClassTag](
     new Iterator[String] {
       def next(): String = {
         if (!hasNext()) {
-          throw new NoSuchElementException()
+          throw SparkCoreErrors.noSuchElementException()
         }
         lines.next()
       }
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index eb7bf4d..35e53b6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -36,6 +36,7 @@ import org.apache.spark._
 import org.apache.spark.Partitioner._
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
 import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.RDD_LIMIT_SCALE_UP_FACTOR
@@ -92,15 +93,7 @@ abstract class RDD[T: ClassTag](
 
   private def sc: SparkContext = {
     if (_sc == null) {
-      throw new SparkException(
-        "This RDD lacks a SparkContext. It could happen in the following cases: \n(1) RDD " +
-        "transformations and actions are NOT invoked by the driver, but inside of other " +
-        "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " +
-        "because the values transformation and count action cannot be performed inside of the " +
-        "rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " +
-        "Streaming job recovers from checkpoint, this exception will be hit if a reference to " +
-        "an RDD not defined by the streaming job is used in DStream operations. For more " +
-        "information, See SPARK-13758.")
+      throw SparkCoreErrors.rddLacksSparkContextError()
     }
     _sc
   }
@@ -172,8 +165,7 @@ abstract class RDD[T: ClassTag](
   private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
     // TODO: Handle changes of StorageLevel
     if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
-      throw new UnsupportedOperationException(
-        "Cannot change storage level of an RDD after it was already assigned a level")
+      throw SparkCoreErrors.cannotChangeStorageLevelError()
     }
     // If this is the first time this RDD is marked for persisting, register it
     // with the SparkContext for cleanups and accounting. Do this only once.
@@ -951,8 +943,7 @@ abstract class RDD[T: ClassTag](
         def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
           case (true, true) => true
           case (false, false) => false
-          case _ => throw new SparkException("Can only zip RDDs with " +
-            "same number of elements in each partition")
+          case _ => throw SparkCoreErrors.canOnlyZipRDDsWithSamePartitionSizeError()
         }
         def next(): (T, U) = (thisIter.next(), otherIter.next())
       }
@@ -1119,7 +1110,7 @@ abstract class RDD[T: ClassTag](
     }
     sc.runJob(this, reducePartition, mergeResult)
     // Get the final result out of our Option, or throw an exception if the RDD was empty
-    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
+    jobResult.getOrElse(throw SparkCoreErrors.emptyCollectionError())
   }
 
   /**
@@ -1151,7 +1142,7 @@ abstract class RDD[T: ClassTag](
       }
     }
     partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
-      .getOrElse(throw new UnsupportedOperationException("empty collection"))
+      .getOrElse(throw SparkCoreErrors.emptyCollectionError())
   }
 
   /**
@@ -1311,7 +1302,7 @@ abstract class RDD[T: ClassTag](
       : PartialResult[Map[T, BoundedDouble]] = withScope {
     require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
     if (elementClassTag.runtimeClass.isArray) {
-      throw new SparkException("countByValueApprox() does not support arrays")
+      throw SparkCoreErrors.countByValueApproxNotSupportArraysError()
     }
     val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (_, iter) =>
       val map = new OpenHashMap[T, Long]
@@ -1462,7 +1453,7 @@ abstract class RDD[T: ClassTag](
   def first(): T = withScope {
     take(1) match {
       case Array(t) => t
-      case _ => throw new UnsupportedOperationException("empty collection")
+      case _ => throw SparkCoreErrors.emptyCollectionError()
     }
   }
 
@@ -1612,7 +1603,7 @@ abstract class RDD[T: ClassTag](
     // children RDD partitions point to the correct parent partitions. In the future
     // we should revisit this consideration.
     if (context.checkpointDir.isEmpty) {
-      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
+      throw SparkCoreErrors.checkpointDirectoryHasNotBeenSetInSparkContextError()
     } else if (checkpointData.isEmpty) {
       checkpointData = Some(new ReliableRDDCheckpointData(this))
     }
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 5093a12..7339eb6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import java.io.{FileNotFoundException, IOException}
+import java.io.FileNotFoundException
 import java.util.concurrent.TimeUnit
 
 import scala.reflect.ClassTag
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{BUFFER_SIZE, CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME, CHECKPOINT_COMPRESS}
 import org.apache.spark.io.CompressionCodec
@@ -77,7 +78,7 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
     // Fail fast if input files are invalid
     inputFiles.zipWithIndex.foreach { case (path, i) =>
       if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) {
-        throw new SparkException(s"Invalid checkpoint file: $path")
+        throw SparkCoreErrors.invalidCheckpointFileError(path)
       }
     }
     Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i))
@@ -155,7 +156,7 @@ private[spark] object ReliableCheckpointRDD extends Logging {
     val checkpointDirPath = new Path(checkpointDir)
     val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
     if (!fs.mkdirs(checkpointDirPath)) {
-      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
+      throw SparkCoreErrors.failToCreateCheckpointPathError(checkpointDirPath)
     }
 
     // Save to file, and reload it as an RDD
@@ -176,11 +177,8 @@ private[spark] object ReliableCheckpointRDD extends Logging {
     val newRDD = new ReliableCheckpointRDD[T](
       sc, checkpointDirPath.toString, originalRDD.partitioner)
     if (newRDD.partitions.length != originalRDD.partitions.length) {
-      throw new SparkException(
-        "Checkpoint RDD has a different number of partitions from original RDD. Original " +
-          s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
-          s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
-          s"${newRDD.partitions.length}].")
+      throw SparkCoreErrors.checkpointRDDHasDifferentNumberOfPartitionsFromOriginalRDDError(
+        originalRDD.id, originalRDD.partitions.length, newRDD.id, newRDD.partitions.length)
     }
     newRDD
   }
@@ -231,8 +229,7 @@ private[spark] object ReliableCheckpointRDD extends Logging {
       if (!fs.exists(finalOutputPath)) {
         logInfo(s"Deleting tempOutputPath $tempOutputPath")
         fs.delete(tempOutputPath, false)
-        throw new IOException("Checkpoint failed: failed to save output of task: " +
-          s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
+        throw SparkCoreErrors.checkpointFailedToSaveError(ctx.attemptNumber(), finalOutputPath)
       } else {
         // Some other copy of this task must've finished before us and renamed it
         logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 7a592ab..0a26b7b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -22,6 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS
 
@@ -37,7 +38,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v
   private val cpDir: String =
     ReliableRDDCheckpointData.checkpointPath(rdd.context, rdd.id)
       .map(_.toString)
-      .getOrElse { throw new SparkException("Checkpoint dir must be specified.") }
+      .getOrElse { throw SparkCoreErrors.mustSpecifyCheckpointDirError() }
 
   /**
    * Return the directory to which this RDD was checkpointed.

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org