You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/08 15:56:16 UTC
[spark] branch branch-3.2 updated: [SPARK-35958][CORE] Refactor
SparkError.scala to SparkThrowable.java
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new f31cf16 [SPARK-35958][CORE] Refactor SparkError.scala to SparkThrowable.java
f31cf16 is described below
commit f31cf163d9adc471eb9acfd83376896c2a28ad05
Author: Karen Feng <ka...@databricks.com>
AuthorDate: Thu Jul 8 23:54:53 2021 +0800
[SPARK-35958][CORE] Refactor SparkError.scala to SparkThrowable.java
### What changes were proposed in this pull request?
Refactors the base Throwable trait `SparkError.scala` (introduced in SPARK-34920) an interface `SparkThrowable.java`.
### Why are the changes needed?
- Renaming `SparkError` to `SparkThrowable` better reflect sthat this is the base interface for both `Exception` and `Error`
- Migrating to Java maximizes its extensibility
### Does this PR introduce _any_ user-facing change?
Yes; the base trait has been renamed and the accessor methods have changed (eg. `sqlState` -> `getSqlState()`).
### How was this patch tested?
Unit tests.
Closes #33164 from karenfeng/SPARK-35958.
Authored-by: Karen Feng <ka...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 71c086eb87b7610aa39bb0766fbabe4ef371c6a4)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.github/PULL_REQUEST_TEMPLATE | 4 +--
.../main/java/org/apache/spark/SparkThrowable.java | 42 ++++++++++++++++++++++
.../org/apache/spark/memory/MemoryConsumer.java | 4 +--
.../apache/spark/memory/SparkOutOfMemoryError.java | 20 ++++++++++-
core/src/main/resources/error/README.md | 39 +++++++++++---------
core/src/main/resources/error/error-classes.json | 3 ++
.../spark/{SparkError.scala => ErrorInfo.scala} | 28 ++++-----------
.../scala/org/apache/spark/SparkException.scala | 27 +++++++-------
...kErrorSuite.scala => SparkThrowableSuite.scala} | 41 ++++++++++++++-------
.../shuffle/sort/ShuffleExternalSorterSuite.scala | 5 ++-
.../org/apache/spark/sql/AnalysisException.scala | 19 +++++-----
.../sql/catalyst/analysis/CheckAnalysis.scala | 2 +-
.../spark/sql/catalyst/analysis/package.scala | 2 +-
.../spark/sql/catalyst/parser/ParseDriver.scala | 12 +++----
.../spark/sql/errors/QueryCompilationErrors.scala | 2 +-
.../spark/sql/errors/QueryExecutionErrors.scala | 4 +--
.../spark/sql/errors/QueryParsingErrors.scala | 2 +-
.../spark/sql/connector/DataSourceV2Suite.scala | 4 +--
.../hive/thriftserver/HiveThriftServerErrors.scala | 8 ++---
.../sql/hive/thriftserver/SparkSQLDriver.scala | 8 ++---
20 files changed, 175 insertions(+), 101 deletions(-)
diff --git a/.github/PULL_REQUEST_TEMPLATE b/.github/PULL_REQUEST_TEMPLATE
index 66c28ee..7648e396 100644
--- a/.github/PULL_REQUEST_TEMPLATE
+++ b/.github/PULL_REQUEST_TEMPLATE
@@ -8,8 +8,8 @@ Thanks for sending a pull request! Here are some tips for you:
6. If possible, provide a concise example to reproduce the issue for a faster review.
7. If you want to add a new configuration, please read the guideline first for naming configurations in
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
- 8. If you want to add or modify an error message, please read the guideline first:
- https://spark.apache.org/error-message-guidelines.html
+ 8. If you want to add or modify an error type or message, please read the guideline first in
+ 'core/src/main/resources/error/README.md'.
-->
### What changes were proposed in this pull request?
diff --git a/core/src/main/java/org/apache/spark/SparkThrowable.java b/core/src/main/java/org/apache/spark/SparkThrowable.java
new file mode 100644
index 0000000..31a9ab0
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/SparkThrowable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Interface mixed into Throwables thrown from Spark.
+ *
+ * - For backwards compatibility, existing Throwable types can be thrown with an arbitrary error
+ * message with a null error class. See [[SparkException]].
+ * - To promote standardization, Throwables should be thrown with an error class and message
+ * parameters to construct an error message with SparkThrowableHelper.getMessage(). New Throwable
+ * types should not accept arbitrary error messages. See [[SparkArithmeticException]].
+ *
+ * @since 3.2.0
+ */
+@Evolving
+public interface SparkThrowable {
+ // Succinct, human-readable, unique, and consistent representation of the error category
+ // If null, error class is not set
+ String getErrorClass();
+
+ // Portable error identifier across SQL engines
+ // If null, error class or SQLSTATE is not set
+ String getSqlState();
+}
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 1d361ae..fc86893 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -154,8 +154,8 @@ public abstract class MemoryConsumer {
}
taskMemoryManager.showMemoryUsage();
// checkstyle.off: RegexpSinglelineJava
- throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " +
- got);
+ throw new SparkOutOfMemoryError("UNABLE_TO_ACQUIRE_MEMORY",
+ new String[]{Long.toString(required), Long.toString(got)});
// checkstyle.on: RegexpSinglelineJava
}
}
diff --git a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java
index ca00ca5..bf7984b 100644
--- a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java
+++ b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java
@@ -16,6 +16,8 @@
*/
package org.apache.spark.memory;
+import org.apache.spark.SparkThrowable;
+import org.apache.spark.SparkThrowableHelper;
import org.apache.spark.annotation.Private;
/**
@@ -24,7 +26,9 @@ import org.apache.spark.annotation.Private;
* we should use throw this exception, which just kills the current task.
*/
@Private
-public final class SparkOutOfMemoryError extends OutOfMemoryError {
+public final class SparkOutOfMemoryError extends OutOfMemoryError implements SparkThrowable {
+ String errorClass;
+ String[] messageParameters;
public SparkOutOfMemoryError(String s) {
super(s);
@@ -33,4 +37,18 @@ public final class SparkOutOfMemoryError extends OutOfMemoryError {
public SparkOutOfMemoryError(OutOfMemoryError e) {
super(e.getMessage());
}
+
+ public SparkOutOfMemoryError(String errorClass, String[] messageParameters) {
+ super(SparkThrowableHelper.getMessage(errorClass, messageParameters));
+ this.errorClass = errorClass;
+ this.messageParameters = messageParameters;
+ }
+
+ public String getErrorClass() {
+ return errorClass;
+ }
+
+ public String getSqlState() {
+ return SparkThrowableHelper.getSqlState(errorClass);
+ }
}
diff --git a/core/src/main/resources/error/README.md b/core/src/main/resources/error/README.md
index df10b28..18f729c 100644
--- a/core/src/main/resources/error/README.md
+++ b/core/src/main/resources/error/README.md
@@ -1,27 +1,24 @@
# Guidelines
-To throw a standardized user-facing exception, developers should specify the error class and
-message parameters rather than an arbitrary error message.
+To throw a standardized user-facing error or exception, developers should specify the error class
+and message parameters rather than an arbitrary error message.
## Usage
-To throw an exception, do the following.
-
1. Check if an appropriate error class already exists in `error-class.json`.
If true, skip to step 3. Otherwise, continue to step 2.
2. Add a new class to `error-class.json`; keep in mind the invariants below.
-3. Check if the exception type already extends `SparkError`.
+3. Check if the exception type already extends `SparkThrowable`.
If true, skip to step 5. Otherwise, continue to step 4.
-4. Mix `SparkError` into the exception.
+4. Mix `SparkThrowable` into the exception.
5. Throw the exception with the error class and message parameters.
### Before
-Throw exception:
+Throw with arbitrary error message:
throw new TestException("Problem A because B")
-
### After
`error-class.json`
@@ -34,26 +31,31 @@ Throw exception:
`SparkException.scala`
class SparkTestException(
- val errorClass: String,
- val messageParameters: Seq[String])
- extends TestException(SparkError.getMessage(errorClass, messageParameters))
- with SparkError
+ errorClass: String,
+ messageParameters: Seq[String])
+ extends TestException(SparkThrowableHelper.getMessage(errorClass, messageParameters))
+ with SparkThrowable {
+
+ def getErrorClass: String = errorClass
+ def getMessageParameters: Array[String] = messageParameters
+ def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
+ }
-Throw exception:
+Throw with error class and message parameters:
throw new SparkTestException("PROBLEM_BECAUSE", Seq("A", "B"))
## Access fields
-To access error fields, catch exceptions that extend `org.apache.spark.SparkError` and access
- - Error class with `errorClass`
- - SQLSTATE with `sqlState`
+To access error fields, catch exceptions that extend `org.apache.spark.SparkThrowable` and access
+ - Error class with `getErrorClass`
+ - SQLSTATE with `getSqlState`
try {
...
} catch {
- case e: SparkError if e.sqlState.forall(_.startsWith("42")) =>
+ case e: SparkThrowable if Option(e.getSqlState).forall(_.startsWith("42")) =>
warn("Syntax error")
}
@@ -74,6 +76,9 @@ Invariants:
Error messages provide a descriptive, human-readable representation of the error.
The message format accepts string parameters via the C-style printf syntax.
+The quality of the error message should match the
+[guidelines](https://spark.apache.org/error-message-guidelines.html).
+
Invariants:
- Unique
diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 384d936..02feb9d 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -15,6 +15,9 @@
"message" : [ "The second argument of '%s' function needs to be an integer." ],
"sqlState" : "22023"
},
+ "UNABLE_TO_ACQUIRE_MEMORY" : {
+ "message" : [ "Unable to acquire %s bytes of memory, got %s" ]
+ },
"WRITING_JOB_ABORTED" : {
"message" : [ "Writing job aborted" ],
"sqlState" : "40000"
diff --git a/core/src/main/scala/org/apache/spark/SparkError.scala b/core/src/main/scala/org/apache/spark/ErrorInfo.scala
similarity index 66%
rename from core/src/main/scala/org/apache/spark/SparkError.scala
rename to core/src/main/scala/org/apache/spark/ErrorInfo.scala
index 4a23ac3..67a363c 100644
--- a/core/src/main/scala/org/apache/spark/SparkError.scala
+++ b/core/src/main/scala/org/apache/spark/ErrorInfo.scala
@@ -42,10 +42,10 @@ private[spark] case class ErrorInfo(message: Seq[String], sqlState: Option[Strin
}
/**
- * Companion object used by instances of [[SparkError]] to access error class information and
+ * Companion object used by instances of [[SparkThrowable]] to access error class information and
* construct error messages.
*/
-private[spark] object SparkError {
+private[spark] object SparkThrowableHelper {
val errorClassesUrl: URL =
Utils.getSparkClassLoader.getResource("error/error-classes.json")
val errorClassToInfoMap: SortedMap[String, ErrorInfo] = {
@@ -55,29 +55,13 @@ private[spark] object SparkError {
mapper.readValue(errorClassesUrl, new TypeReference[SortedMap[String, ErrorInfo]]() {})
}
- def getMessage(errorClass: String, messageParameters: Seq[String]): String = {
+ def getMessage(errorClass: String, messageParameters: Array[String]): String = {
val errorInfo = errorClassToInfoMap.getOrElse(errorClass,
throw new IllegalArgumentException(s"Cannot find error class '$errorClass'"))
String.format(errorInfo.messageFormat, messageParameters: _*)
}
-}
-/**
- * Trait mixed into exceptions thrown from Spark.
- * - For backwards compatibility, existing exception types can be thrown with an arbitrary error
- * message with no error class. See [[SparkException]].
- * - To promote standardization, exceptions should be thrown with an error class and message
- * parameters to construct an error message with SparkError.getMessage(). New exception types
- * should not accept arbitrary error messages. See [[SparkArithmeticException]].
- */
-trait SparkError extends Throwable {
- // Should be provided during Exception invocation
- val errorClass: Option[String]
- protected val messageParameters: Seq[String]
-
- // Derived from error class
- private val errorInfo: Option[ErrorInfo] =
- errorClass.flatMap(SparkError.errorClassToInfoMap.get)
- // None if the error class or SQLSTATE are not set
- val sqlState: Option[String] = errorInfo.flatMap(_.sqlState)
+ def getSqlState(errorClass: String): String = {
+ Option(errorClass).flatMap(errorClassToInfoMap.get).flatMap(_.sqlState).orNull
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index f8a199a..4137003 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -20,22 +20,25 @@ package org.apache.spark
class SparkException(
message: String,
cause: Throwable,
- val errorClass: Option[String],
- val messageParameters: Seq[String])
- extends Exception(message, cause) with SparkError {
+ errorClass: Option[String],
+ messageParameters: Array[String])
+ extends Exception(message, cause) with SparkThrowable {
def this(message: String, cause: Throwable) =
- this(message = message, cause = cause, errorClass = None, messageParameters = Seq.empty)
+ this(message = message, cause = cause, errorClass = None, messageParameters = Array.empty)
def this(message: String) =
this(message = message, cause = null)
- def this(errorClass: String, messageParameters: Seq[String], cause: Throwable) =
+ def this(errorClass: String, messageParameters: Array[String], cause: Throwable) =
this(
- message = SparkError.getMessage(errorClass, messageParameters),
+ message = SparkThrowableHelper.getMessage(errorClass, messageParameters),
cause = cause,
errorClass = Some(errorClass),
messageParameters = messageParameters)
+
+ override def getErrorClass: String = errorClass.orNull
+ override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass.orNull)
}
/**
@@ -69,12 +72,10 @@ private[spark] class SparkUpgradeException(version: String, message: String, cau
/**
* Arithmetic exception thrown from Spark with an error class.
*/
-class SparkArithmeticException(
- val errorClass: Option[String],
- val messageParameters: Seq[String])
- extends ArithmeticException(SparkError.getMessage(errorClass.get, messageParameters))
- with SparkError {
+class SparkArithmeticException(errorClass: String, messageParameters: Array[String])
+ extends ArithmeticException(SparkThrowableHelper.getMessage(errorClass, messageParameters))
+ with SparkThrowable {
- def this(errorClass: String, messageParameters: Seq[String]) =
- this(errorClass = Some(errorClass), messageParameters = messageParameters)
+ override def getErrorClass: String = errorClass
+ override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}
diff --git a/core/src/test/scala/org/apache/spark/SparkErrorSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
similarity index 78%
rename from core/src/test/scala/org/apache/spark/SparkErrorSuite.scala
rename to core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
index 7d8039d..473ecc6 100644
--- a/core/src/test/scala/org/apache/spark/SparkErrorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import java.io.File
import java.util.IllegalFormatException
+import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.core.JsonParser.Feature.STRICT_DUPLICATE_DETECTION
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.SerializationFeature
@@ -27,12 +28,12 @@ import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.commons.io.IOUtils
-import org.apache.spark.SparkError._
+import org.apache.spark.SparkThrowableHelper._
/**
- * Test suite for Spark errors.
+ * Test suite for Spark Throwables.
*/
-class SparkErrorSuite extends SparkFunSuite {
+class SparkThrowableSuite extends SparkFunSuite {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -65,6 +66,7 @@ class SparkErrorSuite extends SparkFunSuite {
.enable(SerializationFeature.INDENT_OUTPUT)
.build()
val rewrittenString = mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
+ .setSerializationInclusion(Include.NON_ABSENT)
.writeValueAsString(errorClassToInfoMap)
assert(rewrittenString == errorClassFileContents)
}
@@ -94,12 +96,12 @@ class SparkErrorSuite extends SparkFunSuite {
test("Check if error class is missing") {
val ex1 = intercept[IllegalArgumentException] {
- getMessage("", Seq.empty)
+ getMessage("", Array.empty)
}
assert(ex1.getMessage == "Cannot find error class ''")
val ex2 = intercept[IllegalArgumentException] {
- getMessage("LOREM_IPSUM", Seq.empty)
+ getMessage("LOREM_IPSUM", Array.empty)
}
assert(ex2.getMessage == "Cannot find error class 'LOREM_IPSUM'")
}
@@ -107,28 +109,41 @@ class SparkErrorSuite extends SparkFunSuite {
test("Check if message parameters match message format") {
// Requires 2 args
intercept[IllegalFormatException] {
- getMessage("MISSING_COLUMN", Seq.empty)
+ getMessage("MISSING_COLUMN", Array.empty)
}
// Does not fail with too many args (expects 0 args)
- assert(getMessage("DIVIDE_BY_ZERO", Seq("foo", "bar")) == "divide by zero")
+ assert(getMessage("DIVIDE_BY_ZERO", Array("foo", "bar")) == "divide by zero")
}
test("Error message is formatted") {
- assert(getMessage("MISSING_COLUMN", Seq("foo", "bar")) ==
+ assert(getMessage("MISSING_COLUMN", Array("foo", "bar")) ==
"cannot resolve 'foo' given input columns: [bar]")
}
- test("Try catching SparkError") {
+ test("Try catching legacy SparkError") {
+ try {
+ throw new SparkException("Arbitrary legacy message")
+ } catch {
+ case e: SparkThrowable =>
+ assert(e.getErrorClass == null)
+ assert(e.getSqlState == null)
+ case _: Throwable =>
+ // Should not end up here
+ assert(false)
+ }
+ }
+
+ test("Try catching SparkError with error class") {
try {
throw new SparkException(
errorClass = "WRITING_JOB_ABORTED",
- messageParameters = Seq.empty,
+ messageParameters = Array.empty,
cause = null)
} catch {
- case e: SparkError =>
- assert(e.errorClass.contains("WRITING_JOB_ABORTED"))
- assert(e.sqlState.contains("40000"))
+ case e: SparkThrowable =>
+ assert(e.getErrorClass == "WRITING_JOB_ABORTED")
+ assert(e.getSqlState == "40000")
case _: Throwable =>
// Should not end up here
assert(false)
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
index 49055ab..b33708d 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
@@ -107,8 +107,11 @@ class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext wi
// at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
// - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912
// because the size after growing exceeds size limitation 2147483632
- intercept[SparkOutOfMemoryError] {
+ val e = intercept[SparkOutOfMemoryError] {
sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
}
+ assert(e.getMessage == "Unable to acquire 800 bytes of memory, got 400")
+ assert(e.getErrorClass == "UNABLE_TO_ACQUIRE_MEMORY")
+ assert(e.getSqlState == null)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index e46c24b..6299431 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import org.apache.spark.SparkError
+import org.apache.spark.{SparkThrowable, SparkThrowableHelper}
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -35,23 +35,23 @@ class AnalysisException protected[sql] (
@transient val plan: Option[LogicalPlan] = None,
val cause: Option[Throwable] = None,
val errorClass: Option[String] = None,
- val messageParameters: Seq[String] = Seq.empty)
- extends Exception(message, cause.orNull) with SparkError with Serializable {
+ val messageParameters: Array[String] = Array.empty)
+ extends Exception(message, cause.orNull) with SparkThrowable with Serializable {
- def this(errorClass: String, messageParameters: Seq[String], cause: Option[Throwable]) =
+ def this(errorClass: String, messageParameters: Array[String], cause: Option[Throwable]) =
this(
- SparkError.getMessage(errorClass, messageParameters),
+ SparkThrowableHelper.getMessage(errorClass, messageParameters),
errorClass = Some(errorClass),
messageParameters = messageParameters,
cause = cause)
def this(
errorClass: String,
- messageParameters: Seq[String],
+ messageParameters: Array[String],
line: Option[Int],
startPosition: Option[Int]) =
this(
- SparkError.getMessage(errorClass, messageParameters),
+ SparkThrowableHelper.getMessage(errorClass, messageParameters),
line = line,
startPosition = startPosition,
errorClass = Some(errorClass),
@@ -64,7 +64,7 @@ class AnalysisException protected[sql] (
plan: Option[LogicalPlan] = this.plan,
cause: Option[Throwable] = this.cause,
errorClass: Option[String] = this.errorClass,
- messageParameters: Seq[String] = this.messageParameters): AnalysisException =
+ messageParameters: Array[String] = this.messageParameters): AnalysisException =
new AnalysisException(message, line, startPosition, plan, cause, errorClass, messageParameters)
def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = {
@@ -87,4 +87,7 @@ class AnalysisException protected[sql] (
} else {
message
}
+
+ override def getErrorClass: String = errorClass.orNull
+ override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass.orNull)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index a90077b..e439085 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -183,7 +183,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case a: Attribute if !a.resolved =>
val from = operator.inputSet.toSeq.map(_.qualifiedName).mkString(", ")
// cannot resolve '${a.sql}' given input columns: [$from]
- a.failAnalysis(errorClass = "MISSING_COLUMN", messageParameters = Seq(a.sql, from))
+ a.failAnalysis(errorClass = "MISSING_COLUMN", messageParameters = Array(a.sql, from))
case s: Star =>
withPosition(s) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
index f704b84..8ad8706 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
@@ -47,7 +47,7 @@ package object analysis {
throw new AnalysisException(msg, t.origin.line, t.origin.startPosition, cause = Some(cause))
}
- def failAnalysis(errorClass: String, messageParameters: Seq[String]): Nothing = {
+ def failAnalysis(errorClass: String, messageParameters: Array[String]): Nothing = {
throw new AnalysisException(
errorClass = errorClass,
messageParameters = messageParameters,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
index 689a97e..64216e6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
@@ -21,7 +21,7 @@ import org.antlr.v4.runtime.atn.PredictionMode
import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
import org.antlr.v4.runtime.tree.TerminalNodeImpl
-import org.apache.spark.SparkError
+import org.apache.spark.SparkThrowableHelper
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
@@ -126,8 +126,8 @@ abstract class AbstractSqlParser extends ParserInterface with SQLConfHelper with
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
- throw new ParseException(
- Option(command), e.message, position, position, e.errorClass, e.messageParameters)
+ throw new ParseException(Option(command), e.message, position, position,
+ e.errorClass, e.messageParameters)
}
}
}
@@ -215,7 +215,7 @@ class ParseException(
val start: Origin,
val stop: Origin,
errorClass: Option[String] = None,
- messageParameters: Seq[String] = Seq.empty)
+ messageParameters: Array[String] = Array.empty)
extends AnalysisException(
message,
start.line,
@@ -232,9 +232,9 @@ class ParseException(
ParserUtils.position(ctx.getStop))
}
- def this(errorClass: String, messageParameters: Seq[String], ctx: ParserRuleContext) =
+ def this(errorClass: String, messageParameters: Array[String], ctx: ParserRuleContext) =
this(Option(ParserUtils.command(ctx)),
- SparkError.getMessage(errorClass, messageParameters),
+ SparkThrowableHelper.getMessage(errorClass, messageParameters),
ParserUtils.position(ctx.getStart),
ParserUtils.position(ctx.getStop),
Some(errorClass),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index ca47202..4f82e25 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1520,7 +1520,7 @@ private[spark] object QueryCompilationErrors {
// The second argument of '{function}' function needs to be an integer
new AnalysisException(
errorClass = "SECOND_FUNCTION_ARGUMENT_NOT_INTEGER",
- messageParameters = Seq(function),
+ messageParameters = Array(function),
cause = Some(e))
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index e0dd46d..2435414 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -135,7 +135,7 @@ object QueryExecutionErrors {
}
def divideByZeroError(): ArithmeticException = {
- new SparkArithmeticException("DIVIDE_BY_ZERO", Seq.empty)
+ new SparkArithmeticException(errorClass = "DIVIDE_BY_ZERO", messageParameters = Array.empty)
}
def invalidArrayIndexError(index: Int, numElements: Int): ArrayIndexOutOfBoundsException = {
@@ -607,7 +607,7 @@ object QueryExecutionErrors {
def writingJobAbortedError(e: Throwable): Throwable = {
new SparkException(
errorClass = "WRITING_JOB_ABORTED",
- messageParameters = Seq.empty,
+ messageParameters = Array.empty,
cause = e)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 581a469..999ed70 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -321,7 +321,7 @@ object QueryParsingErrors {
def duplicateKeysError(key: String, ctx: ParserRuleContext): Throwable = {
// Found duplicate keys '$key'
- new ParseException(errorClass = "DUPLICATE_KEY", messageParameters = Seq(key), ctx)
+ new ParseException(errorClass = "DUPLICATE_KEY", messageParameters = Array(key), ctx)
}
def unexpectedFomatForSetConfigurationError(ctx: SetConfigurationContext): Throwable = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 50df8f4..b42d48d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -285,8 +285,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
input.write.format(cls.getName).option("path", path).mode("overwrite").save()
}
assert(e3.getMessage.contains("Writing job aborted"))
- assert(e3.errorClass.contains("WRITING_JOB_ABORTED"))
- assert(e3.sqlState.contains("40000"))
+ assert(e3.getErrorClass == "WRITING_JOB_ABORTED")
+ assert(e3.getSqlState == "40000")
// make sure we don't have partial data.
assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServerErrors.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServerErrors.scala
index f68b4ee..4d786fd 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServerErrors.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServerErrors.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.RejectedExecutionException
import org.apache.hive.service.ServiceException
import org.apache.hive.service.cli.{HiveSQLException, OperationType}
-import org.apache.spark.SparkError
+import org.apache.spark.SparkThrowable
/**
* Object for grouping error messages from (most) exceptions thrown during
@@ -37,10 +37,10 @@ object HiveThriftServerErrors {
}
def runningQueryError(e: Throwable): Throwable = e match {
- case se: SparkError =>
- val errorClassPrefix = se.errorClass.map(e => s"[$e] ").getOrElse("")
+ case st: SparkThrowable =>
+ val errorClassPrefix = Option(st.getErrorClass).map(e => s"[$e] ").getOrElse("")
new HiveSQLException(
- s"Error running query: ${errorClassPrefix}${se.toString}", se.sqlState.orNull, se)
+ s"Error running query: ${errorClassPrefix}${st.toString}", st.getSqlState, st)
case _ => new HiveSQLException(s"Error running query: ${e.toString}", e)
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 830e93a..f42803d 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
-import org.apache.spark.SparkError
+import org.apache.spark.SparkThrowable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
@@ -71,9 +71,9 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
} catch {
- case se: SparkError =>
- logDebug(s"Failed in [$command]", se)
- new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(se), se.sqlState.orNull, se)
+ case st: SparkThrowable =>
+ logDebug(s"Failed in [$command]", st)
+ new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(st), st.getSqlState, st)
case cause: Throwable =>
logError(s"Failed in [$command]", cause)
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null, cause)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org