You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/11/04 05:50:29 UTC

[spark] branch master updated: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5741d38ee27 [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes
5741d38ee27 is described below

commit 5741d38ee272418a919fe7d102514c221a6e741a
Author: SandishKumarHN <sa...@gmail.com>
AuthorDate: Fri Nov 4 14:50:12 2022 +0900

    [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes
    
    This is the follow-up PR to https://github.com/apache/spark/pull/37972 and https://github.com/apache/spark/pull/38212
    
    ### What changes were proposed in this pull request?
    1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json).
    2. Support protobuf imports
    3. validate protobuf timestamp and duration types.
    
    ### Why are the changes needed?
    N/A
    
    ### Does this PR introduce _any_ user-facing change?
    None
    
    ### How was this patch tested?
    Existing tests should cover the validation of this PR.
    
    CC: rangadi mposdev21 gengliangwang
    
    Closes #38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls.
    
    Authored-by: SandishKumarHN <sa...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/protobuf/ProtobufDataToCatalyst.scala      |  22 +--
 .../spark/sql/protobuf/ProtobufDeserializer.scala  |  31 ++--
 .../spark/sql/protobuf/ProtobufSerializer.scala    |  38 +++--
 .../spark/sql/protobuf/utils/ProtobufUtils.scala   | 117 ++++++++------
 .../sql/protobuf/utils/SchemaConverters.scala      |  32 ++--
 .../src/test/resources/protobuf/basicmessage.proto |  39 +++++
 .../test/resources/protobuf/catalyst_types.proto   |   2 -
 .../src/test/resources/protobuf/duration.proto     |  26 +++
 .../test/resources/protobuf/functions_suite.desc   | Bin 5958 -> 6678 bytes
 .../test/resources/protobuf/functions_suite.proto  |  35 ++--
 .../src/test/resources/protobuf/nestedenum.proto   |  28 ++++
 .../src/test/resources/protobuf/timestamp.proto    |  26 +++
 .../ProtobufCatalystDataConversionSuite.scala      |   2 +-
 .../sql/protobuf/ProtobufFunctionsSuite.scala      |  35 ++--
 .../spark/sql/protobuf/ProtobufSerdeSuite.scala    |  85 +++++++---
 core/src/main/resources/error/error-classes.json   | 107 ++++++++++++-
 project/SparkBuild.scala                           |   2 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  | 177 ++++++++++++++++++++-
 .../spark/sql/errors/QueryExecutionErrors.scala    |   8 +
 .../sql/errors/QueryCompilationErrorsSuite.scala   |   4 +-
 20 files changed, 625 insertions(+), 191 deletions(-)

diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
index cad2442f10c..c0997b1bd06 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
@@ -21,11 +21,10 @@ import scala.util.control.NonFatal
 
 import com.google.protobuf.DynamicMessage
 
-import org.apache.spark.SparkException
-import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
 import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters}
 import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType}
 
@@ -71,16 +70,11 @@ private[protobuf] case class ProtobufDataToCatalyst(
   @transient private lazy val parseMode: ParseMode = {
     val mode = protobufOptions.parseMode
     if (mode != PermissiveMode && mode != FailFastMode) {
-      throw new AnalysisException(unacceptableModeMessage(mode.name))
+      throw QueryCompilationErrors.parseModeUnsupportedError(prettyName, mode)
     }
     mode
   }
 
-  private def unacceptableModeMessage(name: String): String = {
-    s"from_protobuf() doesn't support the $name mode. " +
-      s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
-  }
-
   @transient private lazy val nullResultRow: Any = dataType match {
     case st: StructType =>
       val resultRow = new SpecificInternalRow(st.map(_.dataType))
@@ -98,13 +92,9 @@ private[protobuf] case class ProtobufDataToCatalyst(
       case PermissiveMode =>
         nullResultRow
       case FailFastMode =>
-        throw new SparkException(
-          "Malformed records are detected in record parsing. " +
-            s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " +
-            "result, try setting the option 'mode' as 'PERMISSIVE'.",
-          e)
+        throw QueryExecutionErrors.malformedProtobufMessageDetectedInMessageParsingError(e)
       case _ =>
-        throw new AnalysisException(unacceptableModeMessage(parseMode.name))
+        throw QueryCompilationErrors.parseModeUnsupportedError(prettyName, parseMode)
     }
   }
 
@@ -119,8 +109,8 @@ private[protobuf] case class ProtobufDataToCatalyst(
         case Some(number) =>
           // Unknown fields contain a field with same number as a known field. Must be due to
           // mismatch of schema between writer and reader here.
-          throw new IllegalArgumentException(s"Type mismatch encountered for field:" +
-              s" ${messageDescriptor.getFields.get(number)}")
+          throw QueryCompilationErrors.protobufFieldTypeMismatchError(
+            messageDescriptor.getFields.get(number).toString)
         case None =>
       }
 
diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
index 0403b741ebf..46366ba268b 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
@@ -22,6 +22,7 @@ import com.google.protobuf.{ByteString, DynamicMessage, Message}
 import com.google.protobuf.Descriptors._
 import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
 import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
@@ -29,7 +30,6 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.protobuf.utils.ProtobufUtils
 import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoMatchedField
 import org.apache.spark.sql.protobuf.utils.ProtobufUtils.toFieldStr
-import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -61,10 +61,10 @@ private[sql] class ProtobufDeserializer(
           }
       }
     } catch {
-      case ise: IncompatibleSchemaException =>
-        throw new IncompatibleSchemaException(
-          s"Cannot convert Protobuf type ${rootDescriptor.getName} " +
-            s"to SQL type ${rootCatalystType.sql}.",
+      case ise: AnalysisException =>
+        throw QueryCompilationErrors.cannotConvertProtobufTypeToCatalystTypeError(
+          rootDescriptor.getName,
+          rootCatalystType,
           ise)
     }
 
@@ -152,11 +152,6 @@ private[sql] class ProtobufDeserializer(
       catalystType: DataType,
       protoPath: Seq[String],
       catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = {
-    val errorPrefix = s"Cannot convert Protobuf ${toFieldStr(protoPath)} to " +
-      s"SQL ${toFieldStr(catalystPath)} because "
-    val incompatibleMsg = errorPrefix +
-      s"schema is incompatible (protoType = ${protoType} ${protoType.toProto.getLabel} " +
-      s"${protoType.getJavaType} ${protoType.getType}, sqlType = ${catalystType.sql})"
 
     (protoType.getJavaType, catalystType) match {
 
@@ -175,8 +170,9 @@ private[sql] class ProtobufDeserializer(
       case (INT, ShortType) =>
         (updater, ordinal, value) => updater.setShort(ordinal, value.asInstanceOf[Short])
 
-      case  (BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
-      ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
+      case  (
+        BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
+        ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
         newArrayWriter(protoType, protoPath, catalystPath, dataType, containsNull)
 
       case (LONG, LongType) =>
@@ -199,7 +195,8 @@ private[sql] class ProtobufDeserializer(
         (updater, ordinal, value) =>
           val byte_array = value match {
             case s: ByteString => s.toByteArray
-            case _ => throw new Exception("Invalid ByteString format")
+            case unsupported =>
+              throw QueryCompilationErrors.invalidByteStringFormatError(unsupported)
           }
           updater.set(ordinal, byte_array)
 
@@ -244,7 +241,13 @@ private[sql] class ProtobufDeserializer(
       case (ENUM, StringType) =>
         (updater, ordinal, value) => updater.set(ordinal, UTF8String.fromString(value.toString))
 
-      case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+      case _ =>
+        throw QueryCompilationErrors.cannotConvertProtobufTypeToSqlTypeError(
+          toFieldStr(protoPath),
+          catalystPath,
+          s"${protoType} ${protoType.toProto.getLabel} ${protoType.getJavaType}" +
+            s" ${protoType.getType}",
+          catalystType)
     }
   }
 
diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala
index 5d9af92c5c0..0f87c640b19 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala
@@ -23,13 +23,14 @@ import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
 import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
 import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.protobuf.utils.ProtobufUtils
 import org.apache.spark.sql.protobuf.utils.ProtobufUtils.{toFieldStr, ProtoMatchedField}
-import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
 import org.apache.spark.sql.types._
 
 /**
@@ -53,10 +54,10 @@ private[sql] class ProtobufSerializer(
             newStructConverter(st, rootDescriptor, Nil, Nil).asInstanceOf[Any => Any]
         }
       } catch {
-        case ise: IncompatibleSchemaException =>
-          throw new IncompatibleSchemaException(
-            s"Cannot convert SQL type ${rootCatalystType.sql} to Protobuf type " +
-              s"${rootDescriptor.getName}.",
+        case ise: AnalysisException =>
+          throw QueryCompilationErrors.cannotConvertSqlTypeToProtobufError(
+            rootDescriptor.getName,
+            rootCatalystType,
             ise)
       }
     if (nullable) { (data: Any) =>
@@ -77,8 +78,6 @@ private[sql] class ProtobufSerializer(
       fieldDescriptor: FieldDescriptor,
       catalystPath: Seq[String],
       protoPath: Seq[String]): Converter = {
-    val errorPrefix = s"Cannot convert SQL ${toFieldStr(catalystPath)} " +
-      s"to Protobuf ${toFieldStr(protoPath)} because "
     (catalystType, fieldDescriptor.getJavaType) match {
       case (NullType, _) =>
         (getter, ordinal) => null
@@ -104,10 +103,11 @@ private[sql] class ProtobufSerializer(
         (getter, ordinal) =>
           val data = getter.getUTF8String(ordinal).toString
           if (!enumSymbols.contains(data)) {
-            throw new IncompatibleSchemaException(
-              errorPrefix +
-                s""""$data" cannot be written since it's not defined in enum """ +
-                enumSymbols.mkString("\"", "\", \"", "\""))
+            throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufEnumTypeError(
+              catalystPath,
+              toFieldStr(protoPath),
+              data,
+              enumSymbols.mkString("\"", "\", \"", "\""))
           }
           fieldDescriptor.getEnumType.findValueByName(data)
       case (StringType, STRING) =>
@@ -124,7 +124,8 @@ private[sql] class ProtobufSerializer(
       case (TimestampType, MESSAGE) =>
         (getter, ordinal) =>
           val millis = DateTimeUtils.microsToMillis(getter.getLong(ordinal))
-          Timestamp.newBuilder()
+          Timestamp
+            .newBuilder()
             .setSeconds((millis / 1000))
             .setNanos(((millis % 1000) * 1000000).toInt)
             .build()
@@ -201,7 +202,8 @@ private[sql] class ProtobufSerializer(
           val calendarInterval = IntervalUtils.fromIntervalString(dayTimeIntervalString)
 
           val millis = DateTimeUtils.microsToMillis(calendarInterval.microseconds)
-          val duration = Duration.newBuilder()
+          val duration = Duration
+            .newBuilder()
             .setSeconds((millis / 1000))
             .setNanos(((millis % 1000) * 1000000).toInt)
 
@@ -215,10 +217,12 @@ private[sql] class ProtobufSerializer(
           duration.build()
 
       case _ =>
-        throw new IncompatibleSchemaException(
-          errorPrefix +
-            s"schema is incompatible (sqlType = ${catalystType.sql}, " +
-            s"protoType = ${fieldDescriptor.getJavaType})")
+        throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufTypeError(
+          catalystPath,
+          toFieldStr(protoPath),
+          catalystType,
+          s"${fieldDescriptor} ${fieldDescriptor.toProto.getLabel} ${fieldDescriptor.getJavaType}" +
+            s" ${fieldDescriptor.getType}")
     }
   }
 
diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
index fa2ec9b7cd4..4bd59ddce6c 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
@@ -23,11 +23,12 @@ import java.util.Locale
 import scala.collection.JavaConverters._
 
 import com.google.protobuf.{DescriptorProtos, Descriptors, InvalidProtocolBufferException, Message}
+import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, FileDescriptorSet}
 import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -61,9 +62,9 @@ private[sql] object ProtobufUtils extends Logging {
       protoPath: Seq[String],
       catalystPath: Seq[String]) {
     if (descriptor.getName == null) {
-      throw new IncompatibleSchemaException(
-        s"Attempting to treat ${descriptor.getName} as a RECORD, " +
-          s"but it was: ${descriptor.getContainingType}")
+      throw QueryCompilationErrors.unknownProtobufMessageTypeError(
+        descriptor.getName,
+        descriptor.getContainingType().getName)
     }
 
     private[this] val protoFieldArray = descriptor.getFields.asScala.toArray
@@ -79,30 +80,29 @@ private[sql] object ProtobufUtils extends Logging {
 
     /**
      * Validate that there are no Catalyst fields which don't have a matching Protobuf field,
-     * throwing [[IncompatibleSchemaException]] if such extra fields are found. If
-     * `ignoreNullable` is false, consider nullable Catalyst fields to be eligible to be an extra
-     * field; otherwise, ignore nullable Catalyst fields when checking for extras.
+     * throwing [[AnalysisException]] if such extra fields are found. If `ignoreNullable` is
+     * false, consider nullable Catalyst fields to be eligible to be an extra field; otherwise,
+     * ignore nullable Catalyst fields when checking for extras.
      */
     def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit =
       catalystSchema.fields.foreach { sqlField =>
         if (getFieldByName(sqlField.name).isEmpty &&
           (!ignoreNullable || !sqlField.nullable)) {
-          throw new IncompatibleSchemaException(
-            s"Cannot find ${toFieldStr(catalystPath :+ sqlField.name)} in Protobuf schema")
+          throw QueryCompilationErrors.cannotFindCatalystTypeInProtobufSchemaError(
+            toFieldStr(catalystPath :+ sqlField.name))
         }
       }
 
     /**
      * Validate that there are no Protobuf fields which don't have a matching Catalyst field,
-     * throwing [[IncompatibleSchemaException]] if such extra fields are found. Only required
-     * (non-nullable) fields are checked; nullable fields are ignored.
+     * throwing [[AnalysisException]] if such extra fields are found. Only required (non-nullable)
+     * fields are checked; nullable fields are ignored.
      */
     def validateNoExtraRequiredProtoFields(): Unit = {
       val extraFields = protoFieldArray.toSet -- matchedFields.map(_.fieldDescriptor)
       extraFields.filterNot(isNullable).foreach { extraField =>
-        throw new IncompatibleSchemaException(
-          s"Found ${toFieldStr(protoPath :+ extraField.getName())} in Protobuf schema " +
-            "but there is no match in the SQL schema")
+        throw QueryCompilationErrors.cannotFindProtobufFieldInCatalystError(
+          toFieldStr(protoPath :+ extraField.getName()))
       }
     }
 
@@ -125,10 +125,11 @@ private[sql] object ProtobufUtils extends Logging {
         case Seq(protoField) => Some(protoField)
         case Seq() => None
         case matches =>
-          throw new IncompatibleSchemaException(
-            s"Searching for '$name' in " +
-              s"Protobuf schema at ${toFieldStr(protoPath)} gave ${matches.size} matches. " +
-              s"Candidates: " + matches.map(_.getName()).mkString("[", ", ", "]"))
+          throw QueryCompilationErrors.protobufFieldMatchError(
+            name,
+            toFieldStr(protoPath),
+            s"${matches.size}",
+            matches.map(_.getName()).mkString("[", ", ", "]"))
       }
     }
   }
@@ -157,16 +158,12 @@ private[sql] object ProtobufUtils extends Logging {
     val protobufClass = try {
       Utils.classForName(protobufClassName)
     } catch {
-      case _: ClassNotFoundException =>
-        val hasDots = protobufClassName.contains(".")
-        throw new IllegalArgumentException(
-          s"Could not load Protobuf class with name '$protobufClassName'" +
-          (if (hasDots) "" else ". Ensure the class name includes package prefix.")
-        )
+      case e: ClassNotFoundException =>
+        throw QueryCompilationErrors.protobufClassLoadError(protobufClassName, e)
     }
 
     if (!classOf[Message].isAssignableFrom(protobufClass)) {
-      throw new IllegalArgumentException(s"$protobufClassName is not a Protobuf message type")
+      throw QueryCompilationErrors.protobufMessageTypeError(protobufClassName)
       // TODO: Need to support V2. This might work with V2 classes too.
     }
 
@@ -178,46 +175,70 @@ private[sql] object ProtobufUtils extends Logging {
   }
 
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = {
-    val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-      desc.getName == messageName || desc.getFullName == messageName
-    }
+    // Find the first message descriptor that matches the name.
+    val descriptorOpt = parseFileDescriptorSet(descFilePath)
+      .flatMap { fileDesc =>
+        fileDesc.getMessageTypes.asScala.find { desc =>
+          desc.getName == messageName || desc.getFullName == messageName
+        }
+      }.headOption
 
-    descriptor match {
+    descriptorOpt match {
       case Some(d) => d
-      case None =>
-        throw new RuntimeException(s"Unable to locate Message '$messageName' in Descriptor")
+      case None => throw QueryCompilationErrors.unableToLocateProtobufMessageError(messageName)
     }
   }
 
-  private def parseFileDescriptor(descFilePath: String): Descriptors.FileDescriptor = {
+  private def parseFileDescriptorSet(descFilePath: String): List[Descriptors.FileDescriptor] = {
     var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null
     try {
       val dscFile = new BufferedInputStream(new FileInputStream(descFilePath))
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(descFilePath, ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
-      if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
-      }
-      fileDescriptor
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptorList: List[Descriptors.FileDescriptor] =
+        fileDescriptorSet.getFileList.asScala.map( fileDescriptorProto =>
+          buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex)
+        ).toList
+      fileDescriptorList
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(descFilePath, e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+    val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+      fileDescriptorProtoMap.get(dependency) match {
+        case Some(dependencyProto) =>
+          buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+        case None =>
+          throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+      }
     }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors to protos.
+   */
+  private def createDescriptorProtoMap(
+    fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+    fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+      descriptorProto.getName() -> descriptorProto
+    }.toMap[String, FileDescriptorProto]
   }
 
   /**
diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
index 4fca06fb5d8..6fcba3b8918 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.protobuf.ScalaReflectionLock
 import org.apache.spark.sql.types._
 
@@ -62,14 +63,18 @@ object SchemaConverters {
       case STRING => Some(StringType)
       case BYTE_STRING => Some(BinaryType)
       case ENUM => Some(StringType)
-      case MESSAGE if fd.getMessageType.getName == "Duration" =>
+      case MESSAGE
+        if (fd.getMessageType.getName == "Duration" &&
+          fd.getMessageType.getFields.size() == 2 &&
+          fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
+          fd.getMessageType.getFields.get(1).getName.equals("nanos")) =>
         Some(DayTimeIntervalType.defaultConcreteType)
-      case MESSAGE if fd.getMessageType.getName == "Timestamp" =>
-        Some(TimestampType)
-        // FIXME: Is the above accurate? Users can have protos named "Timestamp" but are not
-        //        expected to be TimestampType in Spark. How about verifying fields?
-        //        Same for "Duration". Only the Timestamp & Duration protos defined in
-        //        google.protobuf package should default to corresponding Catalylist types.
+      case MESSAGE
+        if (fd.getMessageType.getName == "Timestamp" &&
+          fd.getMessageType.getFields.size() == 2 &&
+          fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
+          fd.getMessageType.getFields.get(1).getName.equals("nanos")) =>
+          Some(TimestampType)
       case MESSAGE if fd.isRepeated && fd.getMessageType.getOptions.hasMapEntry =>
         var keyType: DataType = NullType
         var valueType: DataType = NullType
@@ -88,9 +93,7 @@ object SchemaConverters {
             nullable = false))
       case MESSAGE =>
         if (existingRecordNames.contains(fd.getFullName)) {
-          throw new IncompatibleSchemaException(s"""
-               |Found recursive reference in Protobuf schema, which can not be processed by Spark:
-               |${fd.toString()}""".stripMargin)
+          throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
         }
         val newRecordNames = existingRecordNames + fd.getFullName
 
@@ -100,10 +103,8 @@ object SchemaConverters {
             .toSeq)
           .filter(_.nonEmpty)
           .map(StructType.apply)
-      case _ =>
-        throw new IncompatibleSchemaException(
-          s"Cannot convert Protobuf type" +
-            s" ${fd.getJavaType}")
+      case other =>
+        throw QueryCompilationErrors.protobufTypeUnsupportedYetError(other.toString)
     }
     dataType.map(dt =>
       StructField(
@@ -111,7 +112,4 @@ object SchemaConverters {
         if (fd.isRepeated) ArrayType(dt, containsNull = false) else dt,
         nullable = !fd.isRequired && !fd.isRepeated))
   }
-
-  private[protobuf] class IncompatibleSchemaException(msg: String, ex: Throwable = null)
-      extends Exception(msg, ex)
 }
diff --git a/connector/protobuf/src/test/resources/protobuf/basicmessage.proto b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto
new file mode 100644
index 00000000000..4252f349cf0
--- /dev/null
+++ b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// cd connector/protobuf/src/test/resources/protobuf
+// protoc --java_out=./ basicmessage.proto
+// protoc --include_imports --descriptor_set_out=basicmessage.desc --java_out=org/apache/spark/sql/protobuf/ basicmessage.proto
+
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf.protos;
+
+import "nestedenum.proto";
+
+option java_outer_classname = "BasicMessageProto";
+
+message BasicMessage {
+  int64 id = 1;
+  string string_value = 2;
+  int32 int32_value = 3;
+  int64 int64_value = 4;
+  double double_value = 5;
+  float float_value = 6;
+  bool bool_value = 7;
+  bytes bytes_value = 8;
+  NestedEnum rnested_enum = 9;
+}
diff --git a/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto b/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto
index 1deb193438c..0732de10858 100644
--- a/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto
+++ b/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto
@@ -22,8 +22,6 @@ syntax = "proto3";
 package org.apache.spark.sql.protobuf.protos;
 option java_outer_classname = "CatalystTypes";
 
-// TODO: import one or more protobuf files.
-
 message BooleanMsg {
   bool bool_type = 1;
 }
diff --git a/connector/protobuf/src/test/resources/protobuf/duration.proto b/connector/protobuf/src/test/resources/protobuf/duration.proto
new file mode 100644
index 00000000000..2e89a8db5b7
--- /dev/null
+++ b/connector/protobuf/src/test/resources/protobuf/duration.proto
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf.protos;
+
+option java_outer_classname = "DurationProto";
+
+message Duration {
+  int64 seconds = 1;
+  int32 nanos = 2;
+}
diff --git a/connector/protobuf/src/test/resources/protobuf/functions_suite.desc b/connector/protobuf/src/test/resources/protobuf/functions_suite.desc
index 6e3a3967277..d54ee4337a5 100644
Binary files a/connector/protobuf/src/test/resources/protobuf/functions_suite.desc and b/connector/protobuf/src/test/resources/protobuf/functions_suite.desc differ
diff --git a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
index 60f8c262141..2fef8495c5e 100644
--- a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
+++ b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
@@ -15,13 +15,18 @@
  * limitations under the License.
  */
 // To compile and create test class:
-// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/functions_suite.proto
-// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/functions_suite.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/functions_suite.proto
+// cd connector/protobuf/src/test/resources/protobuf
+// protoc --java_out=./ functions_suite.proto
+// protoc --include_imports --descriptor_set_out=functions_suite.desc --java_out=org/apache/spark/sql/protobuf/ functions_suite.proto
 
 syntax = "proto3";
 
 package org.apache.spark.sql.protobuf.protos;
 
+import "timestamp.proto";
+import "duration.proto";
+import "basicmessage.proto";
+
 option java_outer_classname = "SimpleMessageProtos";
 
 message SimpleMessageJavaTypes {
@@ -58,7 +63,7 @@ message SimpleMessageRepeated {
   string key = 1;
   string value = 2;
   enum NestedEnum {
-    ESTED_NOTHING = 0;
+    NESTED_NOTHING = 0;
     NESTED_FIRST = 1;
     NESTED_SECOND = 2;
   }
@@ -72,17 +77,6 @@ message SimpleMessageRepeated {
   repeated NestedEnum rnested_enum = 10;
 }
 
-message BasicMessage {
-  int64 id = 1;
-  string string_value = 2;
-  int32 int32_value = 3;
-  int64 int64_value = 4;
-  double double_value = 5;
-  float float_value = 6;
-  bool bool_value = 7;
-  bytes bytes_value = 8;
-}
-
 message RepeatedMessage {
   repeated BasicMessage basic_message = 1;
 }
@@ -119,7 +113,7 @@ message SimpleMessageEnum {
   string key = 1;
   string value = 2;
   enum NestedEnum {
-    ESTED_NOTHING = 0; // TODO: Fix the name.
+    NESTED_NOTHING = 0;
     NESTED_FIRST = 1;
     NESTED_SECOND = 2;
   }
@@ -168,21 +162,10 @@ message requiredMsg {
   int32 col_3 = 4;
 }
 
-// https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/timestamp.proto
-message Timestamp {
-  int64 seconds = 1;
-  int32 nanos = 2;
-}
-
 message timeStampMsg {
   string key = 1;
   Timestamp stmp = 2;
 }
-// https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/duration.proto
-message Duration {
-  int64 seconds = 1;
-  int32 nanos = 2;
-}
 
 message durationMsg {
   string key = 1;
diff --git a/connector/protobuf/src/test/resources/protobuf/nestedenum.proto b/connector/protobuf/src/test/resources/protobuf/nestedenum.proto
new file mode 100644
index 00000000000..20e9005bec0
--- /dev/null
+++ b/connector/protobuf/src/test/resources/protobuf/nestedenum.proto
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf.protos;
+
+option java_outer_classname = "NestedEnumProto";
+
+enum NestedEnum {
+  NESTED_NOTHING = 0;
+  NESTED_FIRST = 1;
+  NESTED_SECOND = 2;
+}
diff --git a/connector/protobuf/src/test/resources/protobuf/timestamp.proto b/connector/protobuf/src/test/resources/protobuf/timestamp.proto
new file mode 100644
index 00000000000..7616cc2ccfc
--- /dev/null
+++ b/connector/protobuf/src/test/resources/protobuf/timestamp.proto
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf.protos;
+
+option java_outer_classname = "TimestampProto";
+
+message Timestamp {
+  int64 seconds = 1;
+  int32 nanos = 2;
+}
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index 19774a2ad07..271c5b0fec8 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -36,7 +36,7 @@ class ProtobufCatalystDataConversionSuite
     with SharedSparkSession
     with ExpressionEvalHelper {
 
-  private val testFileDesc = testFile("protobuf/catalyst_types.desc").replace("file:/", "/")
+  private val testFileDesc = testFile("catalyst_types.desc").replace("file:/", "/")
   private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.CatalystTypes$"
 
   private def checkResultWithEval(
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
index 72280fb0d9e..199ef235f14 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
@@ -24,11 +24,11 @@ import scala.collection.JavaConverters._
 import com.google.protobuf.{ByteString, DynamicMessage}
 
 import org.apache.spark.sql.{Column, QueryTest, Row}
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.functions.{lit, struct}
 import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated
 import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated.NestedEnum
 import org.apache.spark.sql.protobuf.utils.ProtobufUtils
-import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{DayTimeIntervalType, IntegerType, StringType, StructField, StructType, TimestampType}
 
@@ -36,7 +36,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
 
   import testImplicits._
 
-  val testFileDesc = testFile("protobuf/functions_suite.desc").replace("file:/", "/")
+  val testFileDesc = testFile("functions_suite.desc").replace("file:/", "/")
   private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SimpleMessageProtos$"
 
   /**
@@ -114,7 +114,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
       .addRdoubleValue(1092093.654d)
       .addRfloatValue(10903.0f)
       .addRfloatValue(10902.0f)
-      .addRnestedEnum(NestedEnum.ESTED_NOTHING)
+      .addRnestedEnum(NestedEnum.NESTED_NOTHING)
       .addRnestedEnum(NestedEnum.NESTED_FIRST)
       .build()
 
@@ -324,7 +324,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
       .setField(messageEnumDesc.findFieldByName("value"), "value")
       .setField(
         messageEnumDesc.findFieldByName("nested_enum"),
-        messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("ESTED_NOTHING"))
+        messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("NESTED_NOTHING"))
       .setField(
         messageEnumDesc.findFieldByName("nested_enum"),
         messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("NESTED_FIRST"))
@@ -410,7 +410,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
 
     checkWithFileAndClassName("recursiveB") {
       case (name, descFilePathOpt) =>
-        val e = intercept[IncompatibleSchemaException] {
+        val e = intercept[AnalysisException] {
           df.select(
             from_protobuf_wrapper($"messageB", name, descFilePathOpt).as("messageFromProto"))
             .show()
@@ -446,7 +446,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
 
     checkWithFileAndClassName("recursiveD") {
       case (name, descFilePathOpt) =>
-        val e = intercept[IncompatibleSchemaException] {
+        val e = intercept[AnalysisException] {
           df.select(
             from_protobuf_wrapper($"messageD", name, descFilePathOpt).as("messageFromProto"))
             .show()
@@ -458,7 +458,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
   }
 
   test("Handle extra fields : oldProducer -> newConsumer") {
-    val testFileDesc = testFile("protobuf/catalyst_types.desc").replace("file:/", "/")
+    val testFileDesc = testFile("catalyst_types.desc").replace("file:/", "/")
     val oldProducer = ProtobufUtils.buildDescriptor(testFileDesc, "oldProducer")
     val newConsumer = ProtobufUtils.buildDescriptor(testFileDesc, "newConsumer")
 
@@ -498,7 +498,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
   }
 
   test("Handle extra fields : newProducer -> oldConsumer") {
-    val testFileDesc = testFile("protobuf/catalyst_types.desc").replace("file:/", "/")
+    val testFileDesc = testFile("catalyst_types.desc").replace("file:/", "/")
     val newProducer = ProtobufUtils.buildDescriptor(testFileDesc, "newProducer")
     val oldConsumer = ProtobufUtils.buildDescriptor(testFileDesc, "oldConsumer")
 
@@ -587,19 +587,16 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
 
     val df = Seq(basicMessage.toByteArray).toDF("value")
 
-    checkWithFileAndClassName("BasicMessage") {
-      case (name, descFilePathOpt) =>
-        val resultFrom = df
-          .select(from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample)
-          .where("sample.string_value == \"slam\"")
+    val resultFrom = df
+      .select(from_protobuf_wrapper($"value", "BasicMessage", Some(testFileDesc)) as 'sample)
+      .where("sample.string_value == \"slam\"")
 
-        val resultToFrom = resultFrom
-          .select(to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'value)
-          .select(from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample)
-          .where("sample.string_value == \"slam\"")
+    val resultToFrom = resultFrom
+      .select(to_protobuf_wrapper($"sample", "BasicMessage", Some(testFileDesc)) as 'value)
+      .select(from_protobuf_wrapper($"value", "BasicMessage", Some(testFileDesc)) as 'sample)
+      .where("sample.string_value == \"slam\"")
 
-        assert(resultFrom.except(resultToFrom).isEmpty)
-    }
+    assert(resultFrom.except(resultToFrom).isEmpty)
   }
 
   test("Handle TimestampType between to_protobuf and from_protobuf") {
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
index efc02524e68..840535654ed 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.protobuf
 import com.google.protobuf.Descriptors.Descriptor
 import com.google.protobuf.DynamicMessage
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.NoopFilters
+import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
 import org.apache.spark.sql.protobuf.utils.ProtobufUtils
-import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StructType}
 
@@ -35,7 +36,7 @@ class ProtobufSerdeSuite extends SharedSparkSession {
   import ProtoSerdeSuite._
   import ProtoSerdeSuite.MatchType._
 
-  val testFileDesc = testFile("protobuf/serde_suite.desc").replace("file:/", "/")
+  val testFileDesc = testFile("serde_suite.desc").replace("file:/", "/")
   private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SerdeSuiteProtos$"
 
   test("Test basic conversion") {
@@ -65,22 +66,24 @@ class ProtobufSerdeSuite extends SharedSparkSession {
 
   test("Fail to convert with field type mismatch") {
     val protoFile = ProtobufUtils.buildDescriptor(testFileDesc, "MissMatchTypeInRoot")
-
     withFieldMatchType { fieldMatch =>
       assertFailedConversionMessage(
         protoFile,
         Deserializer,
         fieldMatch,
-        "Cannot convert Protobuf field 'foo' to SQL field 'foo' because schema is incompatible " +
-          s"(protoType = org.apache.spark.sql.protobuf.MissMatchTypeInRoot.foo " +
-          s"LABEL_OPTIONAL LONG INT64, sqlType = ${CATALYST_STRUCT.head.dataType.sql})".stripMargin)
+        errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE",
+        params = Map(
+          "protobufType" -> "MissMatchTypeInRoot",
+          "toType" -> toSQLType(CATALYST_STRUCT)))
 
       assertFailedConversionMessage(
         protoFile,
         Serializer,
         fieldMatch,
-        s"Cannot convert SQL field 'foo' to Protobuf field 'foo' because schema is incompatible " +
-          s"""(sqlType = ${CATALYST_STRUCT.head.dataType.sql}, protoType = LONG)""")
+        errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+        params = Map(
+          "protobufType" -> "MissMatchTypeInRoot",
+          "toType" -> toSQLType(CATALYST_STRUCT)))
     }
   }
 
@@ -91,9 +94,22 @@ class ProtobufSerdeSuite extends SharedSparkSession {
       .add("foo", new StructType().add("bar", IntegerType, nullable = false))
 
     // serialize fails whether or not 'bar' is nullable
-    val byNameMsg = "Cannot find field 'foo.bar' in Protobuf schema"
-    assertFailedConversionMessage(protoFile, Serializer, BY_NAME, byNameMsg)
-    assertFailedConversionMessage(protoFile, Serializer, BY_NAME, byNameMsg, nonnullCatalyst)
+    assertFailedConversionMessage(
+      protoFile,
+      Serializer,
+      BY_NAME,
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+      params = Map(
+        "protobufType" -> "FieldMissingInProto",
+        "toType" -> toSQLType(CATALYST_STRUCT)))
+
+    assertFailedConversionMessage(protoFile,
+      Serializer,
+      BY_NAME,
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+      params = Map(
+        "protobufType" -> "FieldMissingInProto",
+        "toType" -> toSQLType(nonnullCatalyst)))
   }
 
   test("Fail to convert with deeply nested field type mismatch") {
@@ -107,18 +123,21 @@ class ProtobufSerdeSuite extends SharedSparkSession {
         protoFile,
         Deserializer,
         fieldMatch,
-        s"Cannot convert Protobuf field 'top.foo.bar' to SQL field 'top.foo.bar' because schema " +
-          s"is incompatible (protoType = org.apache.spark.sql.protobuf.protos.TypeMiss.bar " +
-          s"LABEL_OPTIONAL LONG INT64, sqlType = INT)",
-        catalyst)
+        catalyst,
+        errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE",
+        params = Map(
+          "protobufType" -> "MissMatchTypeInDeepNested",
+          "toType" -> toSQLType(catalyst)))
 
       assertFailedConversionMessage(
         protoFile,
         Serializer,
         fieldMatch,
-        "Cannot convert SQL field 'top.foo.bar' to Protobuf field 'top.foo.bar' because schema " +
-          """is incompatible (sqlType = INT, protoType = LONG)""",
-        catalyst)
+        catalyst,
+        errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+        params = Map(
+          "protobufType" -> "MissMatchTypeInDeepNested",
+          "toType" -> toSQLType(catalyst)))
     }
   }
 
@@ -130,7 +149,10 @@ class ProtobufSerdeSuite extends SharedSparkSession {
       protoFile,
       Serializer,
       BY_NAME,
-      "Found field 'boo' in Protobuf schema but there is no match in the SQL schema")
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+      params = Map(
+        "protobufType" -> "FieldMissingInSQLRoot",
+        "toType" -> toSQLType(CATALYST_STRUCT)))
 
     /* deserializing should work regardless of whether the extra field is missing
      in SQL Schema or not */
@@ -144,7 +166,10 @@ class ProtobufSerdeSuite extends SharedSparkSession {
       protoNestedFile,
       Serializer,
       BY_NAME,
-      "Found field 'foo.baz' in Protobuf schema but there is no match in the SQL schema")
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+      params = Map(
+        "protobufType" -> "FieldMissingInSQLNested",
+        "toType" -> toSQLType(CATALYST_STRUCT)))
 
     /* deserializing should work regardless of whether the extra field is missing
       in SQL Schema or not */
@@ -161,20 +186,28 @@ class ProtobufSerdeSuite extends SharedSparkSession {
       protoSchema: Descriptor,
       serdeFactory: SerdeFactory[_],
       fieldMatchType: MatchType,
-      expectedCauseMessage: String,
-      catalystSchema: StructType = CATALYST_STRUCT): Unit = {
-    val e = intercept[IncompatibleSchemaException] {
+      catalystSchema: StructType = CATALYST_STRUCT,
+      errorClass: String,
+      params: Map[String, String]): Unit = {
+
+    val e = intercept[AnalysisException] {
       serdeFactory.create(catalystSchema, protoSchema, fieldMatchType)
     }
+
     val expectMsg = serdeFactory match {
       case Deserializer =>
-        s"Cannot convert Protobuf type ${protoSchema.getName} to SQL type ${catalystSchema.sql}."
+        s"[CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE] Unable to convert" +
+          s" ${protoSchema.getName} of Protobuf to SQL type ${toSQLType(catalystSchema)}."
       case Serializer =>
-        s"Cannot convert SQL type ${catalystSchema.sql} to Protobuf type ${protoSchema.getName}."
+        s"[UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE] Unable to convert SQL type" +
+          s" ${toSQLType(catalystSchema)} to Protobuf type ${protoSchema.getName}."
     }
 
     assert(e.getMessage === expectMsg)
-    assert(e.getCause.getMessage === expectedCauseMessage)
+    checkError(
+      exception = e,
+      errorClass = errorClass,
+      parameters = params)
   }
 
   def withFieldMatchType(f: MatchType => Unit): Unit = {
diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 053c2a7af21..7fc806752be 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -17,6 +17,31 @@
     ],
     "sqlState" : "22005"
   },
+  "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR" : {
+    "message" : [
+      "Error constructing FileDescriptor for <descFilePath>"
+    ]
+  },
+  "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE" : {
+    "message" : [
+      "Cannot convert Protobuf <protobufColumn> to SQL <sqlColumn> because schema is incompatible (protobufType = <protobufType>, sqlType = <sqlType>)."
+    ]
+  },
+  "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE" : {
+    "message" : [
+      "Unable to convert <protobufType> of Protobuf to SQL type <toType>."
+    ]
+  },
+  "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_ENUM_TYPE" : {
+    "message" : [
+      "Cannot convert SQL <sqlColumn> to Protobuf <protobufColumn> because <data> cannot be written since it's not defined in ENUM <enumString>"
+    ]
+  },
+  "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_FIELD_TYPE" : {
+    "message" : [
+      "Cannot convert SQL <sqlColumn> to Protobuf <protobufColumn> because schema is incompatible (protobufType = <protobufType>, sqlType = <sqlType>)."
+    ]
+  },
   "CANNOT_DECODE_URL" : {
     "message" : [
       "Cannot decode url : <url>."
@@ -29,12 +54,22 @@
     ],
     "sqlState" : "22007"
   },
+  "CANNOT_LOAD_PROTOBUF_CLASS" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName>. Ensure the class name includes package prefix."
+    ]
+  },
   "CANNOT_PARSE_DECIMAL" : {
     "message" : [
       "Cannot parse decimal"
     ],
     "sqlState" : "42000"
   },
+  "CANNOT_PARSE_PROTOBUF_DESCRIPTOR" : {
+    "message" : [
+      "Error parsing file <descFilePath> descriptor byte[] into Descriptor object"
+    ]
+  },
   "CANNOT_PARSE_TIMESTAMP" : {
     "message" : [
       "<message>. If necessary set <ansiConfig> to \"false\" to bypass this error."
@@ -551,6 +586,11 @@
       "Invalid bucket file: <path>"
     ]
   },
+  "INVALID_BYTE_STRING" : {
+    "message" : [
+      "The expected format is ByteString, but was <unsupported> (<class>)."
+    ]
+  },
   "INVALID_COLUMN_OR_FIELD_DATA_TYPE" : {
     "message" : [
       "Column or field <name> is of type <type> while it's required to be <expectedType>."
@@ -600,6 +640,11 @@
       "<value> is an invalid property value, please use quotes, e.g. SET <key>=<value>"
     ]
   },
+  "INVALID_PROTOBUF_MESSAGE_TYPE" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
   "INVALID_SQL_SYNTAX" : {
     "message" : [
       "Invalid SQL syntax: <inputString>"
@@ -618,6 +663,11 @@
       }
     }
   },
+  "MALFORMED_PROTOBUF_MESSAGE" : {
+    "message" : [
+      "Malformed Protobuf messages are detected in message deserialization. Parse Mode: <failFastMode>. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'."
+    ]
+  },
   "MISSING_STATIC_PARTITION_COLUMN" : {
     "message" : [
       "Unknown static partition column: <columnName>"
@@ -669,7 +719,12 @@
       "No handler for UDAF '<functionName>'. Use sparkSession.udf.register(...) instead."
     ]
   },
-  "NO_UDF_INTERFACE_ERROR" : {
+  "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA" : {
+    "message" : [
+      "Cannot find <catalystFieldPath> in Protobuf schema"
+    ]
+  },
+  "NO_UDF_INTERFACE" : {
     "message" : [
       "UDF class <className> doesn't implement any UDF interface"
     ]
@@ -741,6 +796,46 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_DEPENDENCY_NOT_FOUND" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND" : {
+    "message" : [
+      "Error reading Protobuf descriptor file at path: <filePath>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_NOT_FOUND" : {
+    "message" : [
+      "Unable to locate Message <messageName> in Descriptor"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "RECURSIVE_PROTOBUF_SCHEMA" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
   "RENAME_SRC_PATH_NOT_FOUND" : {
     "message" : [
       "Failed to rename as <sourcePath> was not found"
@@ -827,6 +922,16 @@
       "Unable to acquire <requestedBytes> bytes of memory, got <receivedBytes>"
     ]
   },
+  "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE" : {
+    "message" : [
+      "Unable to convert SQL type <toType> to Protobuf type <protobufType>."
+    ]
+  },
+  "UNKNOWN_PROTOBUF_MESSAGE_TYPE" : {
+    "message" : [
+      "Attempting to treat <descriptorName> as a Message, but it was <containingType>"
+    ]
+  },
   "UNPIVOT_REQUIRES_ATTRIBUTES" : {
     "message" : [
       "UNPIVOT requires all given <given> expressions to be columns when no <empty> expressions are given. These are not columns: [<expressions>]."
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f61753f7b1d..18667d1efea 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -723,7 +723,7 @@ object SparkProtobuf {
 
     dependencyOverrides += "com.google.protobuf" % "protobuf-java" % protoVersion,
 
-    (Test / PB.protoSources) += (Test / sourceDirectory).value / "resources",
+    (Test / PB.protoSources) += (Test / sourceDirectory).value / "resources" / "protobuf",
 
     (Test / PB.targets) := Seq(
       PB.gens.java -> target.value / "generated-test-sources"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 03d16856755..ea2f961509a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3008,7 +3008,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
 
   def udfClassDoesNotImplementAnyUDFInterfaceError(className: String): Throwable = {
     new AnalysisException(
-      errorClass = "NO_UDF_INTERFACE_ERROR",
+      errorClass = "NO_UDF_INTERFACE",
       messageParameters = Map("className" -> className))
   }
 
@@ -3213,4 +3213,179 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: Seq[String],
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: Seq[String],
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_FIELD_TYPE",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: Seq[String],
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_ENUM_TYPE",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Option(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Option(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_MESSAGE_NOT_FOUND",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(descFilePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR",
+      messageParameters = Map.empty("descFilePath" -> descFilePath),
+      cause = Option(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Option(cause.getCause))
+  }
+
+  def failedParsingDescriptorError(descFilePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
+      messageParameters = Map.empty("descFilePath" -> descFilePath),
+      cause = Option(cause.getCause))
+  }
+
+  def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = {
+    new AnalysisException(
+      errorClass = "RECURSIVE_PROTOBUF_SCHEMA",
+      messageParameters = Map("fieldDescriptor" -> fieldDescriptor))
+  }
+
+  def protobufFieldTypeMismatchError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_TYPE_MISMATCH",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufClassLoadError(
+      protobufClassName: String,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_LOAD_PROTOBUF_CLASS",
+      messageParameters = Map("protobufClassName" -> protobufClassName),
+      cause = Option(cause.getCause))
+  }
+
+  def protobufMessageTypeError(protobufClassName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "INVALID_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map("protobufClassName" -> protobufClassName))
+  }
+
+  def protobufDescriptorDependencyError(dependencyName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DEPENDENCY_NOT_FOUND",
+      messageParameters = Map("dependencyName" -> dependencyName))
+  }
+
+  def invalidByteStringFormatError(unsupported: Any): Throwable = {
+    new AnalysisException(
+      errorClass = "INVALID_BYTE_STRING",
+      messageParameters = Map(
+        "unsupported" -> unsupported.toString,
+        "class" -> unsupported.getClass.toString))
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index f78ff23d269..b8febfffba1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2808,4 +2808,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
       errorClass = "UNSUPPORTED_EMPTY_LOCATION",
       messageParameters = Map.empty)
   }
+
+  def malformedProtobufMessageDetectedInMessageParsingError(e: Throwable): Throwable = {
+    new SparkException(
+      errorClass = "MALFORMED_PROTOBUF_MESSAGE",
+      messageParameters = Map(
+        "failFastMode" -> FailFastMode.name),
+      cause = e)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
index 8086a0e97ec..bed647ef49f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
@@ -227,7 +227,7 @@ class QueryCompilationErrorsSuite
       parameters = Map[String, String]())
   }
 
-  test("NO_UDF_INTERFACE_ERROR: java udf class does not implement any udf interface") {
+  test("NO_UDF_INTERFACE: java udf class does not implement any udf interface") {
     val className = "org.apache.spark.sql.errors.MyCastToString"
     val e = intercept[AnalysisException](
       spark.udf.registerJava(
@@ -237,7 +237,7 @@ class QueryCompilationErrorsSuite
     )
     checkError(
       exception = e,
-      errorClass = "NO_UDF_INTERFACE_ERROR",
+      errorClass = "NO_UDF_INTERFACE",
       parameters = Map("className" -> className))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org