You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/28 14:02:25 UTC
[spark] branch master updated: [SPARK-36095][CORE] Grouping
exception in core/rdd
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new af6d04b [SPARK-36095][CORE] Grouping exception in core/rdd
af6d04b is described below
commit af6d04b65ca45c0d8e1aacc93da7be271b586506
Author: dgd-contributor <dg...@viettel.com.vn>
AuthorDate: Wed Jul 28 22:01:26 2021 +0800
[SPARK-36095][CORE] Grouping exception in core/rdd
### What changes were proposed in this pull request?
This PR group exception messages in core/src/main/scala/org/apache/spark/rdd
### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.
### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.
### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.
Closes #33317 from dgd-contributor/SPARK-36095_GroupExceptionCoreRdd.
Lead-authored-by: dgd-contributor <dg...@viettel.com.vn>
Co-authored-by: Wenchen Fan <cl...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/errors/SparkCoreErrors.scala | 144 +++++++++++++++++++++
.../main/scala/org/apache/spark/rdd/BlockRDD.scala | 6 +-
.../org/apache/spark/rdd/DoubleRDDFunctions.scala | 4 +-
.../main/scala/org/apache/spark/rdd/EmptyRDD.scala | 3 +-
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 4 +-
.../org/apache/spark/rdd/LocalCheckpointRDD.scala | 9 +-
.../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 3 +-
.../org/apache/spark/rdd/PairRDDFunctions.scala | 15 ++-
.../main/scala/org/apache/spark/rdd/PipedRDD.scala | 3 +-
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 27 ++--
.../apache/spark/rdd/ReliableCheckpointRDD.scala | 17 +--
.../spark/rdd/ReliableRDDCheckpointData.scala | 3 +-
12 files changed, 186 insertions(+), 52 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
new file mode 100644
index 0000000..a29f375
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.errors
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.storage.{BlockId, RDDBlockId}
+
+/**
+ * Object for grouping error messages from (most) exceptions thrown during query execution.
+ */
+object SparkCoreErrors {
+ def rddBlockNotFoundError(blockId: BlockId, id: Int): Throwable = {
+ new Exception(s"Could not compute split, block $blockId of RDD $id not found")
+ }
+
+ def blockHaveBeenRemovedError(string: String): Throwable = {
+ new SparkException(s"Attempted to use $string after its blocks have been removed!")
+ }
+
+ def histogramOnEmptyRDDOrContainingInfinityOrNaNError(): Throwable = {
+ new UnsupportedOperationException(
+ "Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
+ }
+
+ def emptyRDDError(): Throwable = {
+ new UnsupportedOperationException("empty RDD")
+ }
+
+ def pathNotSupportedError(path: String): Throwable = {
+ new IOException(s"Path: ${path} is a directory, which is not supported by the " +
+ "record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.")
+ }
+
+ def checkpointRDDBlockIdNotFoundError(rddBlockId: RDDBlockId): Throwable = {
+ new SparkException(
+ s"""
+ |Checkpoint block $rddBlockId not found! Either the executor
+ |that originally checkpointed this partition is no longer alive, or the original RDD is
+ |unpersisted. If this problem persists, you may consider using `rdd.checkpoint()`
+ |instead, which is slower than local checkpointing but more fault-tolerant.
+ """.stripMargin.replaceAll("\n", " "))
+ }
+
+ def endOfStreamError(): Throwable = {
+ new java.util.NoSuchElementException("End of stream")
+ }
+
+ def cannotUseMapSideCombiningWithArrayKeyError(): Throwable = {
+ new SparkException("Cannot use map-side combining with array keys.")
+ }
+
+ def hashPartitionerCannotPartitionArrayKeyError(): Throwable = {
+ new SparkException("HashPartitioner cannot partition array keys.")
+ }
+
+ def reduceByKeyLocallyNotSupportArrayKeysError(): Throwable = {
+ new SparkException("reduceByKeyLocally() does not support array keys")
+ }
+
+ def noSuchElementException(): Throwable = {
+ new NoSuchElementException()
+ }
+
+ def rddLacksSparkContextError(): Throwable = {
+ new SparkException("This RDD lacks a SparkContext. It could happen in the following cases: " +
+ "\n(1) RDD transformations and actions are NOT invoked by the driver, but inside of other " +
+ "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " +
+ "because the values transformation and count action cannot be performed inside of the " +
+ "rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " +
+ "Streaming job recovers from checkpoint, this exception will be hit if a reference to " +
+ "an RDD not defined by the streaming job is used in DStream operations. For more " +
+ "information, See SPARK-13758.")
+ }
+
+ def cannotChangeStorageLevelError(): Throwable = {
+ new UnsupportedOperationException(
+ "Cannot change storage level of an RDD after it was already assigned a level")
+ }
+
+ def canOnlyZipRDDsWithSamePartitionSizeError(): Throwable = {
+ new SparkException("Can only zip RDDs with same number of elements in each partition")
+ }
+
+ def emptyCollectionError(): Throwable = {
+ new UnsupportedOperationException("empty collection")
+ }
+
+ def countByValueApproxNotSupportArraysError(): Throwable = {
+ new SparkException("countByValueApprox() does not support arrays")
+ }
+
+ def checkpointDirectoryHasNotBeenSetInSparkContextError(): Throwable = {
+ new SparkException("Checkpoint directory has not been set in the SparkContext")
+ }
+
+ def invalidCheckpointFileError(path: Path): Throwable = {
+ new SparkException(s"Invalid checkpoint file: $path")
+ }
+
+ def failToCreateCheckpointPathError(checkpointDirPath: Path): Throwable = {
+ new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
+ }
+
+ def checkpointRDDHasDifferentNumberOfPartitionsFromOriginalRDDError(
+ originalRDDId: Int,
+ originalRDDLength: Int,
+ newRDDId: Int,
+ newRDDLength: Int): Throwable = {
+ new SparkException(
+ s"""
+ |Checkpoint RDD has a different number of partitions from original RDD. Original
+ |RDD [ID: $originalRDDId, num of partitions: $originalRDDLength];
+ |Checkpoint RDD [ID: $newRDDId, num of partitions: $newRDDLength].
+ """.stripMargin.replaceAll("\n", " "))
+ }
+
+ def checkpointFailedToSaveError(task: Int, path: Path): Throwable = {
+ new IOException("Checkpoint failed: failed to save output of task: " +
+ s"$task and final output path does not exist: $path")
+ }
+
+ def mustSpecifyCheckpointDirError(): Throwable = {
+ new SparkException("Checkpoint dir must be specified.")
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index a5c3e2a..05cad3d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
import org.apache.spark._
+import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.storage.{BlockId, BlockManager}
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
@@ -47,7 +48,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo
blockManager.get[T](blockId) match {
case Some(block) => block.data.asInstanceOf[Iterator[T]]
case None =>
- throw new Exception(s"Could not compute split, block $blockId of RDD $id not found")
+ throw SparkCoreErrors.rddBlockNotFoundError(blockId, id)
}
}
@@ -79,8 +80,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo
/** Check if this BlockRDD is valid. If not valid, exception is thrown. */
private[spark] def assertValid(): Unit = {
if (!isValid) {
- throw new SparkException(
- "Attempted to use %s after its blocks have been removed!".format(toString))
+ throw SparkCoreErrors.blockHaveBeenRemovedError(toString)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 39f6956..9c97e02 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
import org.apache.spark.TaskContext
import org.apache.spark.annotation.Since
+import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.MeanEvaluator
@@ -135,8 +136,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
(maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
}
if (min.isNaN || max.isNaN || max.isInfinity || min.isInfinity ) {
- throw new UnsupportedOperationException(
- "Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
+ throw SparkCoreErrors.histogramOnEmptyRDDOrContainingInfinityOrNaNError()
}
val range = if (min != max) {
// Range.Double.inclusive(min, max, increment)
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
index a2d7e34..8b75b3c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.errors.SparkCoreErrors
/**
* An RDD that has no partitions and no elements.
@@ -29,6 +30,6 @@ private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc,
override def getPartitions: Array[Partition] = Array.empty
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
- throw new UnsupportedOperationException("empty RDD")
+ throw SparkCoreErrors.emptyRDDError()
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 5fc0b4f..7011451 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -36,6 +36,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
@@ -234,8 +235,7 @@ class HadoopRDD[K, V](
Array.empty[Partition]
case e: IOException if e.getMessage.startsWith("Not a file:") =>
val path = e.getMessage.split(":").map(_.trim).apply(2)
- throw new IOException(s"Path: ${path} is a directory, which is not supported by the " +
- s"record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.")
+ throw SparkCoreErrors.pathNotSupportedError(path)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
index 113ed2d..342cf66 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
@@ -19,7 +19,8 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
-import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.storage.RDDBlockId
/**
@@ -57,11 +58,7 @@ private[spark] class LocalCheckpointRDD[T: ClassTag](
* available in the block storage.
*/
override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
- throw new SparkException(
- s"Checkpoint block ${RDDBlockId(rddId, partition.index)} not found! Either the executor " +
- s"that originally checkpointed this partition is no longer alive, or the original RDD is " +
- s"unpersisted. If this problem persists, you may consider using `rdd.checkpoint()` " +
- s"instead, which is slower than local checkpointing but more fault-tolerant.")
+ throw SparkCoreErrors.checkpointRDDBlockIdNotFoundError(RDDBlockId(rddId, partition.index))
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index a7a6cf4..7fdbe7b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
@@ -270,7 +271,7 @@ class NewHadoopRDD[K, V](
override def next(): (K, V) = {
if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
+ throw SparkCoreErrors.endOfStreamError()
}
havePair = false
if (!finished) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index f280c22..4dd2967 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewO
import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
+import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SPECULATION_ENABLED
import org.apache.spark.internal.io._
@@ -76,10 +77,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
- throw new SparkException("Cannot use map-side combining with array keys.")
+ throw SparkCoreErrors.cannotUseMapSideCombiningWithArrayKeyError()
}
if (partitioner.isInstanceOf[HashPartitioner]) {
- throw new SparkException("HashPartitioner cannot partition array keys.")
+ throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
}
}
val aggregator = new Aggregator[K, V, C](
@@ -331,7 +332,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val cleanedF = self.sparkContext.clean(func)
if (keyClass.isArray) {
- throw new SparkException("reduceByKeyLocally() does not support array keys")
+ throw SparkCoreErrors.reduceByKeyLocallyNotSupportArrayKeysError()
}
val reducePartition = (iter: Iterator[(K, V)]) => {
@@ -524,7 +525,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
- throw new SparkException("HashPartitioner cannot partition array keys.")
+ throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
}
if (self.partitioner == Some(partitioner)) {
self
@@ -776,7 +777,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
- throw new SparkException("HashPartitioner cannot partition array keys.")
+ throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
@@ -794,7 +795,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
- throw new SparkException("HashPartitioner cannot partition array keys.")
+ throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
@@ -809,7 +810,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
- throw new SparkException("HashPartitioner cannot partition array keys.")
+ throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Array(vs, w1s, w2s) =>
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 5dd8cb8..3cf0dd2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -33,6 +33,7 @@ import scala.io.Source
import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.util.Utils
@@ -184,7 +185,7 @@ private[spark] class PipedRDD[T: ClassTag](
new Iterator[String] {
def next(): String = {
if (!hasNext()) {
- throw new NoSuchElementException()
+ throw SparkCoreErrors.noSuchElementException()
}
lines.next()
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index eb7bf4d..35e53b6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -36,6 +36,7 @@ import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.RDD_LIMIT_SCALE_UP_FACTOR
@@ -92,15 +93,7 @@ abstract class RDD[T: ClassTag](
private def sc: SparkContext = {
if (_sc == null) {
- throw new SparkException(
- "This RDD lacks a SparkContext. It could happen in the following cases: \n(1) RDD " +
- "transformations and actions are NOT invoked by the driver, but inside of other " +
- "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " +
- "because the values transformation and count action cannot be performed inside of the " +
- "rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " +
- "Streaming job recovers from checkpoint, this exception will be hit if a reference to " +
- "an RDD not defined by the streaming job is used in DStream operations. For more " +
- "information, See SPARK-13758.")
+ throw SparkCoreErrors.rddLacksSparkContextError()
}
_sc
}
@@ -172,8 +165,7 @@ abstract class RDD[T: ClassTag](
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
- throw new UnsupportedOperationException(
- "Cannot change storage level of an RDD after it was already assigned a level")
+ throw SparkCoreErrors.cannotChangeStorageLevelError()
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
@@ -951,8 +943,7 @@ abstract class RDD[T: ClassTag](
def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
case (true, true) => true
case (false, false) => false
- case _ => throw new SparkException("Can only zip RDDs with " +
- "same number of elements in each partition")
+ case _ => throw SparkCoreErrors.canOnlyZipRDDsWithSamePartitionSizeError()
}
def next(): (T, U) = (thisIter.next(), otherIter.next())
}
@@ -1119,7 +1110,7 @@ abstract class RDD[T: ClassTag](
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
- jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
+ jobResult.getOrElse(throw SparkCoreErrors.emptyCollectionError())
}
/**
@@ -1151,7 +1142,7 @@ abstract class RDD[T: ClassTag](
}
}
partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
- .getOrElse(throw new UnsupportedOperationException("empty collection"))
+ .getOrElse(throw SparkCoreErrors.emptyCollectionError())
}
/**
@@ -1311,7 +1302,7 @@ abstract class RDD[T: ClassTag](
: PartialResult[Map[T, BoundedDouble]] = withScope {
require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
if (elementClassTag.runtimeClass.isArray) {
- throw new SparkException("countByValueApprox() does not support arrays")
+ throw SparkCoreErrors.countByValueApproxNotSupportArraysError()
}
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (_, iter) =>
val map = new OpenHashMap[T, Long]
@@ -1462,7 +1453,7 @@ abstract class RDD[T: ClassTag](
def first(): T = withScope {
take(1) match {
case Array(t) => t
- case _ => throw new UnsupportedOperationException("empty collection")
+ case _ => throw SparkCoreErrors.emptyCollectionError()
}
}
@@ -1612,7 +1603,7 @@ abstract class RDD[T: ClassTag](
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
- throw new SparkException("Checkpoint directory has not been set in the SparkContext")
+ throw SparkCoreErrors.checkpointDirectoryHasNotBeenSetInSparkContextError()
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 5093a12..7339eb6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.io.{FileNotFoundException, IOException}
+import java.io.FileNotFoundException
import java.util.concurrent.TimeUnit
import scala.reflect.ClassTag
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{BUFFER_SIZE, CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME, CHECKPOINT_COMPRESS}
import org.apache.spark.io.CompressionCodec
@@ -77,7 +78,7 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
// Fail fast if input files are invalid
inputFiles.zipWithIndex.foreach { case (path, i) =>
if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) {
- throw new SparkException(s"Invalid checkpoint file: $path")
+ throw SparkCoreErrors.invalidCheckpointFileError(path)
}
}
Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i))
@@ -155,7 +156,7 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val checkpointDirPath = new Path(checkpointDir)
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
if (!fs.mkdirs(checkpointDirPath)) {
- throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
+ throw SparkCoreErrors.failToCreateCheckpointPathError(checkpointDirPath)
}
// Save to file, and reload it as an RDD
@@ -176,11 +177,8 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
- throw new SparkException(
- "Checkpoint RDD has a different number of partitions from original RDD. Original " +
- s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
- s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
- s"${newRDD.partitions.length}].")
+ throw SparkCoreErrors.checkpointRDDHasDifferentNumberOfPartitionsFromOriginalRDDError(
+ originalRDD.id, originalRDD.partitions.length, newRDD.id, newRDD.partitions.length)
}
newRDD
}
@@ -231,8 +229,7 @@ private[spark] object ReliableCheckpointRDD extends Logging {
if (!fs.exists(finalOutputPath)) {
logInfo(s"Deleting tempOutputPath $tempOutputPath")
fs.delete(tempOutputPath, false)
- throw new IOException("Checkpoint failed: failed to save output of task: " +
- s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
+ throw SparkCoreErrors.checkpointFailedToSaveError(ctx.attemptNumber(), finalOutputPath)
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 7a592ab..0a26b7b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -22,6 +22,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.fs.Path
import org.apache.spark._
+import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS
@@ -37,7 +38,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v
private val cpDir: String =
ReliableRDDCheckpointData.checkpointPath(rdd.context, rdd.id)
.map(_.toString)
- .getOrElse { throw new SparkException("Checkpoint dir must be specified.") }
+ .getOrElse { throw SparkCoreErrors.mustSpecifyCheckpointDirError() }
/**
* Return the directory to which this RDD was checkpointed.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org