You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/22 03:08:50 UTC

[GitHub] [spark] SandishKumarHN opened a new pull request, #38344: [SPARK-40777][SQL][SPARK-PROTOBUF] : Protobuf import support and move error-classes.

SandishKumarHN opened a new pull request, #38344:
URL: https://github.com/apache/spark/pull/38344

   This is the follow-up PR to https://github.com/apache/spark/pull/37972 and https://github.com/apache/spark/pull/38212
   
   ### What changes were proposed in this pull request?
   1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json). 
   2. Support protobuf imports
   3. validate protobuf timestamp and duration types.  
   
   
   ### Why are the changes needed?
   N/A
   
   ### Does this PR introduce _any_ user-facing change?
   None
   
   ### How was this patch tested?
   Existing tests should cover the validation of this PR.  
   
   CC: @rangadi @mposdev21 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1002913803


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -244,7 +239,13 @@ private[sql] class ProtobufDeserializer(
       case (ENUM, StringType) =>
         (updater, ordinal, value) => updater.set(ordinal, UTF8String.fromString(value.toString))
 
-      case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+      case _ =>
+        throw QueryCompilationErrors.cannotConvertProtobufTypeToSqlTypeError(
+          toFieldStr(protoPath),
+          toFieldStr(catalystPath),
+          s"${protoType} ${protoType.toProto.getLabel} ${protoType.getJavaType}" +
+            s" ${protoType.getType}",
+          catalystType.sql)

Review Comment:
   done



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,183 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+    protobufColumn: String,
+    sqlColumn: String,
+    protobufType: String,
+    sqlType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_2251",

Review Comment:
   makes sense made changes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
MaxGekk commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006748193


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala:
##########
@@ -98,13 +92,9 @@ private[protobuf] case class ProtobufDataToCatalyst(
       case PermissiveMode =>
         nullResultRow
       case FailFastMode =>
-        throw new SparkException(
-          "Malformed records are detected in record parsing. " +
-            s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " +
-            "result, try setting the option 'mode' as 'PERMISSIVE'.",
-          e)
+        throw QueryCompilationErrors.malformedRecordsDetectedInRecordParsingError(e)
       case _ =>
-        throw new AnalysisException(unacceptableModeMessage(parseMode.name))
+        throw QueryCompilationErrors.parseModeUnsupportedError("from_protobuf", parseMode)

Review Comment:
   ditto: prettyName



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
MaxGekk commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1004862763


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -644,6 +669,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName> <errorMessage>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object Error: <errorMessage>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor, Error: <errorMessage>"

Review Comment:
   Could you don't pass arbitrary `errorMessage`. In the future, `error-classes.json` might be translated to a local language. An error message in mixed languages won't look nice.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,189 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+    protobufColumn: String,
+    sqlColumn: String,
+    protobufType: String,
+    sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> sqlColumn,

Review Comment:
   It is an identifier, so, quote it by `toSQLId()`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,189 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+    protobufColumn: String,
+    sqlColumn: String,
+    protobufType: String,
+    sqlType: DataType): Throwable = {

Review Comment:
   Fix indentation, see https://github.com/databricks/scala-style-guide#spacing-and-indentation



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,189 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+    protobufColumn: String,
+    sqlColumn: String,
+    protobufType: String,
+    sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> sqlColumn,
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+    sqlColumn: String,
+    protobufColumn: String,
+    sqlType: DataType,
+    protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+    sqlColumn: String,
+    protobufColumn: String,
+    data: String,
+    enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+    protobufType: String,
+    sqlType: DataType,
+    e1: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(e1.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+    protobufType: String,
+    sqlType: DataType,
+    e1: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(e1.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+    descriptorName: String,
+    containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(
+    field: String,
+    protobufSchema: String,
+    matchSize: String,
+    matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(e1: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map("errorMessage" -> e1.getMessage()),
+      cause = Some(e1.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, e1: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Some(e1.getCause))
+  }
+
+  def noProtobufMessageTypeReturnError(descriptorName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("descriptorName" -> descriptorName))
+  }
+
+  def failedParsingDescriptorError(e1: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_PARSING_ERROR",
+      messageParameters = Map("errorMessage" -> e1.getMessage()),
+      cause = Some(e1.getCause))
+  }
+
+  def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_RECURSION_ERROR",
+      messageParameters = Map("fieldDescriptor" -> fieldDescriptor))
+  }
+
+  def protobufFieldTypeMismatchError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_TYPE_MISMATCH",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufClassLoadError(protobufClassName: String, errorMessage: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_CLASS_LOAD_ERROR",
+      messageParameters = Map(
+        "protobufClassName" -> protobufClassName,
+        "errorMessage" -> errorMessage))
+  }
+
+  def protobufMessageTypeError(protobufClassName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("protobufClassName" -> protobufClassName))
+  }
+
+  def protobufDescriptorDependencyError(dependencyName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DEPENDENCY_ERROR",
+      messageParameters = Map("dependencyName" -> dependencyName))
+  }
+
+  def invalidByteStringFormatError(): Throwable = {
+    new AnalysisException(errorClass = "INVALID_BYTE_STRING_ERROR", messageParameters = Map.empty)
+  }
+
+  def malformedRecordsDetectedInRecordParsingError(e1: Throwable): Throwable = {

Review Comment:
   nit: e1 -> cause



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,189 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+    protobufColumn: String,
+    sqlColumn: String,
+    protobufType: String,
+    sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> sqlColumn,
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+    sqlColumn: String,
+    protobufColumn: String,
+    sqlType: DataType,
+    protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006201944


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
     }
   }
 
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+    val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+      fileDescriptorProtoMap.get(dependency) match {
+        case Some(dependencyProto) =>
+          buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+        case None =>
+          throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+      }
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors to protos.
+   */
+  private def createDescriptorProtoMap(
+    fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+    fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+      descriptorProto.getName() -> descriptorProto

Review Comment:
   That is not what I am referring to. This is not really a problem since it won't even compile. 
   
   Notice two files as input for `protoc` above. May be that will since `name` will include the path.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38344: [SPARK-40777][SQL][SPARK-PROTOBUF] : Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38344:
URL: https://github.com/apache/spark/pull/38344#issuecomment-1287607239

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006192940


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)

Review Comment:
   I think that is pretty restrictive. There will be cases where user wants to use the protobuf defined in imported files. E.g. they might have one proto file jus tot import bunch of other file that define the protobufs. 
   
   What are we gaining by having restriction to allow messages only in the last listed file? In addition there could be be multiple top level files (i.e. `protoc` command is run with multiple files).
   
   I think fixing it properly would be the right thing. We can leave a TODO here, I can add that in a follow up. LMK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003632674


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   @rangadi how about supporting imports? I made this [change](https://github.com/apache/spark/pull/38344/files#diff-97aac63266f3c60eef9bd8dd1b76be3a5bd77fe4d17fa6fa370f5e0d9428a0a9L172) of "Timestamp" and "Duration" to show a unit test for imports. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003653232


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   @rangadi how about splitting Timestamp and Duration into our own .proto files and then importing them in the functions_suite.proto file? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003623154


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   @rangadi just "messageClassName" param feature unit tests(Timestamp and Duration). The protoc plugin is only used for the test right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38344:
URL: https://github.com/apache/spark/pull/38344#issuecomment-1303006260

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006028333


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,189 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+    protobufColumn: String,
+    sqlColumn: String,
+    protobufType: String,
+    sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> sqlColumn,
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+    sqlColumn: String,
+    protobufColumn: String,
+    sqlType: DataType,
+    protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,

Review Comment:
   @MaxGekk made changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
MaxGekk commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006747706


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala:
##########
@@ -71,16 +70,11 @@ private[protobuf] case class ProtobufDataToCatalyst(
   @transient private lazy val parseMode: ParseMode = {
     val mode = protobufOptions.parseMode
     if (mode != PermissiveMode && mode != FailFastMode) {
-      throw new AnalysisException(unacceptableModeMessage(mode.name))
+      throw QueryCompilationErrors.parseModeUnsupportedError("from_protobuf", mode)

Review Comment:
   Use `prettyName` instead of "from_protobuf", please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on PR #38344:
URL: https://github.com/apache/spark/pull/38344#issuecomment-1302961885

   > @MaxGekk are you still reviewing this? @SandishKumarHN is there any more review to be addressed? If we are ready, I can ask @HeartSaVioR to merge this (before his weekend starts in Seoul :) ).
   
   @rangadi I have addressed all of the review comments.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1010721467


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -178,46 +176,73 @@ private[sql] object ProtobufUtils extends Logging {
   }
 
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = {
-    val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-      desc.getName == messageName || desc.getFullName == messageName
-    }
+    val fileDescriptor = parseFileDescriptorSet(descFilePath)
+      .find(!_.getMessageTypes.asScala.find(desc =>

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1010092402


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -177,47 +175,75 @@ private[sql] object ProtobufUtils extends Logging {
       .asInstanceOf[Descriptor]
   }
 
+  // TODO: Revisit to ensure that messageName is searched through all imports
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = {
-    val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-      desc.getName == messageName || desc.getFullName == messageName
+    val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor => {

Review Comment:
   @rangadi makes sense to use find and return, fixed.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003651551


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   I think it is better to have import of our own proto files. Currently we don't have any. We can split some of them. I think I left a TODO about that in my PR. Many production proto files will have imports. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003729392


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,27 +194,31 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
 
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
+    val descriptorProto: DescriptorProtos.FileDescriptorProto =

Review Comment:
   could you some brief comments here?
   Is the last file the right file? 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,27 +194,31 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
 
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
+    val descriptorProto: DescriptorProtos.FileDescriptorProto =
+      fileDescriptorSet.getFileList.asScala.last
+
+    var fileDescriptorList = List[Descriptors.FileDescriptor]()

Review Comment:
   Is this the import file list? What happens when the imported file imports other files? i.e. A imports B and B imports C. 



##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala:
##########
@@ -163,18 +163,22 @@ class ProtobufSerdeSuite extends SharedSparkSession {
       fieldMatchType: MatchType,
       expectedCauseMessage: String,
       catalystSchema: StructType = CATALYST_STRUCT): Unit = {
-    val e = intercept[IncompatibleSchemaException] {
+    val e = intercept[Exception] {
       serdeFactory.create(catalystSchema, protoSchema, fieldMatchType)
     }
     val expectMsg = serdeFactory match {
       case Deserializer =>
-        s"Cannot convert Protobuf type ${protoSchema.getName} to SQL type ${catalystSchema.sql}."
+        s"[PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR] Unable to convert" +
+          s" ${protoSchema.getName} of Protobuf to SQL type ${toSQLType(catalystSchema)}."
       case Serializer =>
-        s"Cannot convert SQL type ${catalystSchema.sql} to Protobuf type ${protoSchema.getName}."
+        s"[UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE] Unable to convert SQL type" +
+          s" ${toSQLType(catalystSchema)} to Protobuf type ${protoSchema.getName}."
     }
 
     assert(e.getMessage === expectMsg)
-    assert(e.getCause.getMessage === expectedCauseMessage)
+    if (e.getCause != null) {
+      assert(e.getCause.getMessage === expectedCauseMessage)

Review Comment:
   What does it mean if `e.getCause` is null and `expectedCauseMessage` is not?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -62,14 +63,18 @@ object SchemaConverters {
       case STRING => Some(StringType)
       case BYTE_STRING => Some(BinaryType)
       case ENUM => Some(StringType)
-      case MESSAGE if fd.getMessageType.getName == "Duration" =>
+      case MESSAGE
+        if fd.getMessageType.getName == "Duration" &&
+        fd.getMessageType.getFields.size() == 2 &&
+        fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
+        fd.getMessageType.getFields.get(1).getName.equals("nanos") =>
         Some(DayTimeIntervalType.defaultConcreteType)
-      case MESSAGE if fd.getMessageType.getName == "Timestamp" =>
-        Some(TimestampType)
-        // FIXME: Is the above accurate? Users can have protos named "Timestamp" but are not
-        //        expected to be TimestampType in Spark. How about verifying fields?
-        //        Same for "Duration". Only the Timestamp & Duration protos defined in
-        //        google.protobuf package should default to corresponding Catalylist types.
+      case MESSAGE
+        if fd.getMessageType.getName == "Timestamp" &&
+        fd.getMessageType.getFields.size() == 2 &&

Review Comment:
   Could move all the conditions into braces (a & b & ...)? 



##########
connector/protobuf/src/test/resources/protobuf/timestamp.proto:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf.protos;
+
+option java_outer_classname = "TimestampProto";
+
+message Timestamp {
+  int64 seconds = 1;
+  int32 nanos = 2;
+}

Review Comment:
   nit: Fix end-of-line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003983288


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,27 +194,31 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
 
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
+    val descriptorProto: DescriptorProtos.FileDescriptorProto =
+      fileDescriptorSet.getFileList.asScala.last
+
+    var fileDescriptorList = List[Descriptors.FileDescriptor]()

Review Comment:
   @rangadi some changes were made to support nested imports. Reading protobuf descriptors from the bottom up, the last element in FileDescriptorSet is the initial FileDescriptorProto, from which we will continue to find more FileDescriptors recursively.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003984608


##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala:
##########
@@ -163,18 +163,22 @@ class ProtobufSerdeSuite extends SharedSparkSession {
       fieldMatchType: MatchType,
       expectedCauseMessage: String,
       catalystSchema: StructType = CATALYST_STRUCT): Unit = {
-    val e = intercept[IncompatibleSchemaException] {
+    val e = intercept[Exception] {
       serdeFactory.create(catalystSchema, protoSchema, fieldMatchType)
     }
     val expectMsg = serdeFactory match {
       case Deserializer =>
-        s"Cannot convert Protobuf type ${protoSchema.getName} to SQL type ${catalystSchema.sql}."
+        s"[PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR] Unable to convert" +
+          s" ${protoSchema.getName} of Protobuf to SQL type ${toSQLType(catalystSchema)}."
       case Serializer =>
-        s"Cannot convert SQL type ${catalystSchema.sql} to Protobuf type ${protoSchema.getName}."
+        s"[UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE] Unable to convert SQL type" +
+          s" ${toSQLType(catalystSchema)} to Protobuf type ${protoSchema.getName}."
     }
 
     assert(e.getMessage === expectMsg)
-    assert(e.getCause.getMessage === expectedCauseMessage)
+    if (e.getCause != null) {
+      assert(e.getCause.getMessage === expectedCauseMessage)

Review Comment:
   made changes to support new error class frameworks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1002913637


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   @rangadi this protoc maven option is equal to CLI "protoc --include_imports" and I have added validations to protobuf Timestamp and Duration types. and currently, maven protoc does not generate classes for google protobuf types. sbt is enabled by default.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006128712


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
     }
   }
 
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+    val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+      fileDescriptorProtoMap.get(dependency) match {
+        case Some(dependencyProto) =>
+          buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+        case None =>
+          throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+      }
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors to protos.
+   */
+  private def createDescriptorProtoMap(
+    fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+    fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+      descriptorProto.getName() -> descriptorProto

Review Comment:
   Btw, What happens if there are two different files with the same name? E.g. 
   `protoc ... a/b/events.proto x/y/events.proto`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #38344:
URL: https://github.com/apache/spark/pull/38344#issuecomment-1288323940

   cc @MaxGekk FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008467821


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -792,6 +887,21 @@
       "Unable to acquire <requestedBytes> bytes of memory, got <receivedBytes>"
     ]
   },
+  "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE" : {
+    "message" : [
+      "Unable to convert SQL type <toType> to Protobuf type <protobufType>."
+    ]
+  },
+  "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR" : {

Review Comment:
   @srielau fixed all the error class names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
srielau commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008327776


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -23,6 +23,11 @@
     ],
     "sqlState" : "42000"
   },
+  "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR" : {

Review Comment:
   Definitely remove _ERROR.
   Would be nice if we could gravitate towards *_NOT_FOUND and *_ALREADY_EXIST style naming convention:
   PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -65,6 +70,11 @@
     ],
     "sqlState" : "22005"
   },
+  "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR" : {

Review Comment:
   I don't think we call out Catalyst anywhere else. How is that relevant? Can we be generic?



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -65,6 +70,11 @@
     ],
     "sqlState" : "22005"
   },
+  "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR" : {
+    "message" : [
+      "Cannot convert SQL <sqlColumn> to Protobuf <protobufColumn> because <data> cannot be written since it's not defined in ENUM <enumString>"

Review Comment:
   That's a because "squared"... I don't have enough context to propose a rephrase..
   Also If we have SQL here, maybe we can do CATALYST -> SQL for the name? 



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -579,6 +594,11 @@
       }
     }
   },
+  "MALFORMED_PROTOBUF_MESSAGE_ERROR" : {

Review Comment:
   Remove _ERROR



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -629,11 +649,21 @@
     ],
     "sqlState" : "42000"
   },
+  "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA" : {
+    "message" : [
+      "Cannot find <catalystFieldPath> in Protobuf schema"
+    ]
+  },
   "NO_HANDLER_FOR_UDAF" : {
     "message" : [
       "No handler for UDAF '<functionName>'. Use sparkSession.udf.register(...) instead."
     ]
   },
+  "NO_PROTOBUF_MESSAGE_TYPE_ERROR" : {

Review Comment:
   Remove _ERROR



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {

Review Comment:
   ```suggestion
     "CANNOT_PARSE_PROTOBUF_DESCRIPTOR" : {
   ```
   
   Any hope we know why and can tell? This just says: "Bad Kitty"



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {

Review Comment:
   ```suggestion
     "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR" : {
   ```
   Same as above.. Not sure how helpful this message is.



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {

Review Comment:
   ```suggestion
     "RECURSIVE_PROTOBUF_SCHEMA" : {
   ```



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -760,6 +850,11 @@
     ],
     "sqlState" : "22023"
   },
+  "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR" : {

Review Comment:
   ```suggestion
     "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_TYPE" : {
   ```



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -792,6 +887,21 @@
       "Unable to acquire <requestedBytes> bytes of memory, got <receivedBytes>"
     ]
   },
+  "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE" : {
+    "message" : [
+      "Unable to convert SQL type <toType> to Protobuf type <protobufType>."
+    ]
+  },
+  "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR" : {

Review Comment:
   ```suggestion
     "PROTOBUF_MESSAGE_NOT_FOUND" : {
   ```



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -629,11 +649,21 @@
     ],
     "sqlState" : "42000"
   },
+  "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA" : {

Review Comment:
   CATALYST -> SQL?



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -517,6 +527,11 @@
       "Invalid bucket file: <path>"
     ]
   },
+  "INVALID_BYTE_STRING_ERROR" : {
+    "message" : [
+      "Invalid ByteString format"

Review Comment:
   Can we spell out the invalid format?
   For extra points: Can we spell out a valid format?



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -629,11 +649,21 @@
     ],
     "sqlState" : "42000"
   },
+  "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA" : {
+    "message" : [
+      "Cannot find <catalystFieldPath> in Protobuf schema"
+    ]
+  },
   "NO_HANDLER_FOR_UDAF" : {
     "message" : [
       "No handler for UDAF '<functionName>'. Use sparkSession.udf.register(...) instead."
     ]
   },
+  "NO_PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "No MessageTypes returned, <descriptorName>"
+    ]
+  },
   "NO_UDF_INTERFACE_ERROR" : {

Review Comment:
   Slipped through the cracks... remove here or separate PR.



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -517,6 +527,11 @@
       "Invalid bucket file: <path>"
     ]
   },
+  "INVALID_BYTE_STRING_ERROR" : {

Review Comment:
   Remove _ERROR



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {

Review Comment:
   ```suggestion
     "PROTOBUF_DEPENDENCY_NOT_FOUND" : {
   ```



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {

Review Comment:
   ```suggestion
     "CANNOT_LOAD_PROTOBUF_CLASS" : {
   ```



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {

Review Comment:
   ```suggestion
     "INVALID_PROTOBUF_MESSAGE_TYPE" : {
   ```



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {

Review Comment:
   ```suggestion
     "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA" : {
   ```



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {

Review Comment:
   ```suggestion
     "PROTOBUF_FIELD_MISSING" : {
   ```



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {

Review Comment:
   ```suggestion
     "PROTOBUF_TYPE_NOT_SUPPORT" : {
   ```



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {
+    "message" : [
+      "Unable to convert <protobufType> of Protobuf to SQL type <toType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR" : {

Review Comment:
   How is this different from eh one above? Just more info available?



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -792,6 +887,21 @@
       "Unable to acquire <requestedBytes> bytes of memory, got <receivedBytes>"
     ]
   },
+  "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE" : {

Review Comment:
   Same as above?



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {

Review Comment:
   ```suggestion
     "CANNOT_CONVERT_PROTOBUF_TYPE_TO_SQL_TYPE" : {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1012398593


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -29,12 +44,22 @@
     ],
     "sqlState" : "22007"
   },
+  "CANNOT_LOAD_PROTOBUF_CLASS" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"

Review Comment:
   @srielau fixed the suggested changes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1010672856


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -178,46 +176,73 @@ private[sql] object ProtobufUtils extends Logging {
   }
 
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = {
-    val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-      desc.getName == messageName || desc.getFullName == messageName
-    }
+    val fileDescriptor = parseFileDescriptorSet(descFilePath)
+      .find(!_.getMessageTypes.asScala.find(desc =>

Review Comment:
   Oh, we need to find() invocations and check is made twice. How about this? [Optional to do, only a suggestion]
   
        // Find the first message descriptor that matches the name.
        val descriptorOpt =  parseFileDescriptorSet(descFilePath)
            .flatMap { fileDesc => 
                   fileDesc.getMessageTypes.asScala.find { desc => 
                         desc.getName == messageName || desc.getFullName == messageName
                   }
            }.headOption
   
         descriptoOpt match {
            case Some(d) => d
            case None => throw QueryCompilationErrors.unableToLocateProtobufMessageError(messageName)
        } 
         
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006192940


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)

Review Comment:
   I think that is pretty restrictive. There will be cases where user wants to use the protobuf defined in imported files. E.g. they might have one proto file jus tot import bunch of other file that define the protobufs. 
   
   What are we gaining by having restriction to allow messages only in the last listed file? In addition there could be be multiple top level files (i.e. `protoc` command is run with multiple files).
   
   I think fixing it properly would be the right thing. We can level a TODO here, I can add that in a follow up. LMK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003686011


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   @rangadi made changes same. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes. 
URL: https://github.com/apache/spark/pull/38344


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008228337


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008463500


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -65,6 +70,11 @@
     ],
     "sqlState" : "22005"
   },
+  "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR" : {
+    "message" : [
+      "Cannot convert SQL <sqlColumn> to Protobuf <protobufColumn> because <data> cannot be written since it's not defined in ENUM <enumString>"

Review Comment:
   @srielau there is no ENUM type in spark, so we convert the protobuf enum to spark string.  so we are trying to throw an enum-specific error here.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003630603


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   That's because this removes existing 'Timestamp' message : https://github.com/apache/spark/pull/38344/files#diff-97aac63266f3c60eef9bd8dd1b76be3a5bd77fe4d17fa6fa370f5e0d9428a0a9L172
   
   We could undo that change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1004757693


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto
+   * @param descriptorProtoIndex
+   * @return Descriptors.FileDescriptor
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+    var fileDescriptorList = List[Descriptors.FileDescriptor]()
+    for (dependencyName <- fileDescriptorProto.getDependencyList().asScala) {
+      if (!fileDescriptorProtoIndex.contains(dependencyName)) {
+        throw QueryCompilationErrors.protobufDescriptorDependencyError(dependencyName)
+      }
+      val dependencyProto: FileDescriptorProto = fileDescriptorProtoIndex.get(dependencyName).get
+      fileDescriptorList = fileDescriptorList ++ List(
+        buildFileDescriptor(dependencyProto, fileDescriptorProtoIndex))
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors to protos.
+   * @param fileDescriptorSet

Review Comment:
   Same for _param_ and _return_ here.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto
+   * @param descriptorProtoIndex
+   * @return Descriptors.FileDescriptor
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+    var fileDescriptorList = List[Descriptors.FileDescriptor]()
+    for (dependencyName <- fileDescriptorProto.getDependencyList().asScala) {
+      if (!fileDescriptorProtoIndex.contains(dependencyName)) {
+        throw QueryCompilationErrors.protobufDescriptorDependencyError(dependencyName)
+      }
+      val dependencyProto: FileDescriptorProto = fileDescriptorProtoIndex.get(dependencyName).get
+      fileDescriptorList = fileDescriptorList ++ List(
+        buildFileDescriptor(dependencyProto, fileDescriptorProtoIndex))
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors to protos.
+   * @param fileDescriptorSet
+   * @return Map[String, FileDescriptorProto]
+   */
+  private def createDescriptorProtoIndex(
+    fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+    var resultBuilder = Map[String, FileDescriptorProto]()

Review Comment:
   style-nit : Similarly here here:
   
        fileDescriptorSet.getFileList().asScala.map {
              descriptorProto.getName() -> descriptorProto
         }.toMap()
   



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto
+   * @param descriptorProtoIndex
+   * @return Descriptors.FileDescriptor
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+    var fileDescriptorList = List[Descriptors.FileDescriptor]()

Review Comment:
   style nit: More scala idiomatic:
   
       val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency => 
            fileDescriptorProtoIndex.get(dependency) match {
                case Some(dependencyProto) => 
                        buildFileDescriptor(dependencyProto, fileDescriptorProtoIndex)
                case None => 
                       throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
            }
       }
   
      
   
     
                    
   
   



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto
+   * @param descriptorProtoIndex
+   * @return Descriptors.FileDescriptor
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoIndex: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {

Review Comment:
   minor: Replace 'Index' with 'Map'?



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,28 +195,58 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoIndex(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
+    }
+  }
+
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   * @param descriptorProto

Review Comment:
   nit: Can remove parameters here if we are not adding any description (given these are internal APIs).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003984608


##########
connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala:
##########
@@ -163,18 +163,22 @@ class ProtobufSerdeSuite extends SharedSparkSession {
       fieldMatchType: MatchType,
       expectedCauseMessage: String,
       catalystSchema: StructType = CATALYST_STRUCT): Unit = {
-    val e = intercept[IncompatibleSchemaException] {
+    val e = intercept[Exception] {
       serdeFactory.create(catalystSchema, protoSchema, fieldMatchType)
     }
     val expectMsg = serdeFactory match {
       case Deserializer =>
-        s"Cannot convert Protobuf type ${protoSchema.getName} to SQL type ${catalystSchema.sql}."
+        s"[PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR] Unable to convert" +
+          s" ${protoSchema.getName} of Protobuf to SQL type ${toSQLType(catalystSchema)}."
       case Serializer =>
-        s"Cannot convert SQL type ${catalystSchema.sql} to Protobuf type ${protoSchema.getName}."
+        s"[UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE] Unable to convert SQL type" +
+          s" ${toSQLType(catalystSchema)} to Protobuf type ${protoSchema.getName}."
     }
 
     assert(e.getMessage === expectMsg)
-    assert(e.getCause.getMessage === expectedCauseMessage)
+    if (e.getCause != null) {
+      assert(e.getCause.getMessage === expectedCauseMessage)

Review Comment:
   made changes to support new error class framework. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006190485


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
     }
   }
 
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+    val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+      fileDescriptorProtoMap.get(dependency) match {
+        case Some(dependencyProto) =>
+          buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+        case None =>
+          throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+      }
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors to protos.
+   */
+  private def createDescriptorProtoMap(
+    fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+    fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+      descriptorProto.getName() -> descriptorProto

Review Comment:
   @rangadi protoc command would throw the below error, protoc won't allow duplicate imports. 
   
   ```
   protoc --java_out=./ functions_suite.proto
   basicmessage_1.proto:30:9: "org.apache.spark.sql.protobuf.protos.BasicMessage.id" is already defined in file "basicmessage.proto".
   basicmessage_1.proto:31:10: "org.apache.spark.sql.protobuf.protos.BasicMessage.string_value" is already defined in file "basicmessage.proto".
   basicmessage_1.proto:32:9: "org.apache.spark.sql.protobuf.protos.BasicMessage.int32_value" is already defined in file "basicmessage.proto".
   basicmessage_1.proto:33:9: "org.apache.spark.sql.protobuf.protos.BasicMessage.int64_value" is already defined in file "basicmessage.proto".
   basicmessage_1.proto:34:10: "org.apache.spark.sql.protobuf.protos.BasicMessage.double_value" is already defined in file "basicmessage.proto".
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008417281


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {
+    "message" : [
+      "Unable to convert <protobufType> of Protobuf to SQL type <toType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR" : {

Review Comment:
   this gives which protobuf field had failed to convert, this clearly describes the field name and field type. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008417312


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {

Review Comment:
   this gives which protobuf message had failed to convert, it would be the protobuf root name since the protobuf message might have a long list of fields, just keeping the root name. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008467248


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {

Review Comment:
   @srielau Along with the message added the pathname of the descriptor file and the same for the one below. Aside from the error message, we don't have much information at this time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
srielau commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1012385639


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -742,6 +832,11 @@
     ],
     "sqlState" : "22023"
   },
+  "SQL_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR" : {

Review Comment:
   Rename without _ERROR



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3212,4 +3212,181 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: Seq[String],
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: Seq[String],
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_FIELD_TYPE",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: Seq[String],
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Option(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Option(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_MESSAGE_NOT_FOUND",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(descFilePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR",
+      messageParameters = Map.empty("descFilePath" -> descFilePath),
+      cause = Option(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Option(cause.getCause))
+  }
+
+  def failedParsingDescriptorError(descFilePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
+      messageParameters = Map.empty("descFilePath" -> descFilePath),
+      cause = Option(cause.getCause))
+  }
+
+  def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = {
+    new AnalysisException(
+      errorClass = "RECURSIVE_PROTOBUF_SCHEMA",
+      messageParameters = Map("fieldDescriptor" -> fieldDescriptor))
+  }
+
+  def protobufFieldTypeMismatchError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_TYPE_MISMATCH",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufClassLoadError(
+      protobufClassName: String,
+      hasDots: Boolean,
+      cause: Throwable): Throwable = {
+    val message = if (hasDots) "" else ". Ensure the class name includes package prefix."
+    new AnalysisException(
+      errorClass = "CANNOT_LOAD_PROTOBUF_CLASS",
+      messageParameters = Map("protobufClassName" -> protobufClassName, "message" -> message),

Review Comment:
   We don't want text as a parameter. Aside to make this text depends on whether there is any dot seems to be quite subtle. Maybe it simply doesn't have enough dots?
   I propose to move this text into the error message (unconditionally).



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -29,12 +44,22 @@
     ],
     "sqlState" : "22007"
   },
+  "CANNOT_LOAD_PROTOBUF_CLASS" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"

Review Comment:
   ```suggestion
         "Could not load Protobuf class with name <protobufClassName>. Ensure the class name includes package prefix."
   ```



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -688,6 +733,51 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_DEPENDENCY_NOT_FOUND" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND" : {
+    "message" : [
+      "Error reading Protobuf descriptor file at path: <filePath>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_TO_SQL_TYPE_ERROR" : {

Review Comment:
   How about:
   ```suggestion
     "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE" : {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on PR #38344:
URL: https://github.com/apache/spark/pull/38344#issuecomment-1302907015

   @MaxGekk are you still reviewing this? 
   @SandishKumarHN is there any more review to be addressed? If we are ready, I can ask @HeartSaVioR to merge this (before his weekend starts in Seoul :) ).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008417281


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {
+    "message" : [
+      "Unable to convert <protobufType> of Protobuf to SQL type <toType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR" : {

Review Comment:
   this is thrown when failing to convert protobuf fields into SQL type. 



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {

Review Comment:
   this is thrown when failing to convert the protobuf message to SQL struct type. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006187850


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)

Review Comment:
   @rangadi I removed this option on purpose; since the user would pass the main descriptor file as a parameter, we should just look at the main descriptor. What if the user has the same MessageName in multiple .proto files? Which one should we read?
   
   the descriptor structure is first imports, then message definition, with this assumption "last" would be the main message definition. 
   
   how about looking at the main message definition first, if we don't find we can look at the "imports" return as soon as the messagename matches.?     
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1007058916


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala:
##########
@@ -98,13 +92,9 @@ private[protobuf] case class ProtobufDataToCatalyst(
       case PermissiveMode =>
         nullResultRow
       case FailFastMode =>
-        throw new SparkException(
-          "Malformed records are detected in record parsing. " +
-            s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " +
-            "result, try setting the option 'mode' as 'PERMISSIVE'.",
-          e)
+        throw QueryCompilationErrors.malformedRecordsDetectedInRecordParsingError(e)
       case _ =>
-        throw new AnalysisException(unacceptableModeMessage(parseMode.name))
+        throw QueryCompilationErrors.parseModeUnsupportedError("from_protobuf", parseMode)

Review Comment:
   @MaxGekk fixed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on PR #38344:
URL: https://github.com/apache/spark/pull/38344#issuecomment-1291440502

   @HyukjinKwon getting below error in build any idea? nothing related PR
   
   ```
   File "/usr/lib/python3.9/importlib/__init__.py", line 127, in import_module
       return _bootstrap._gcd_import(name[level:], package, level)
     File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
     File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
     File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
     File "<frozen importlib._bootstrap>", line [68](https://github.com/SandishKumarHN/spark/actions/runs/3325103283/jobs/5498895243#step:17:69)0, in _load_unlocked
     File "/usr/local/lib/python3.9/dist-packages/_pytest/assertion/rewrite.py", line 168, in exec_module
       exec(co, module.__dict__)
     File "/usr/local/lib/python3.9/dist-packages/pytest_mypy_plugins/collect.py", line 13, in <module>
       from py._path.local import LocalPath
   ModuleNotFoundError: No module named 'py._path'; 'py' is not a package
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003984452


##########
connector/protobuf/src/test/resources/protobuf/timestamp.proto:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf.protos;
+
+option java_outer_classname = "TimestampProto";
+
+message Timestamp {
+  int64 seconds = 1;
+  int32 nanos = 2;
+}

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003632674


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   @rangadi how about supporting imports? I made this [change](https://github.com/apache/spark/pull/38344/files#diff-97aac63266f3c60eef9bd8dd1b76be3a5bd77fe4d17fa6fa370f5e0d9428a0a9L172) of "Timestamp" and "Duration" to show a unit test for imports. even if we undo the above change, a unit test for import case needs <includeMavenTypes> plugin change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
MaxGekk commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008034311


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Some(cause.getCause))
+  }
+
+  def noProtobufMessageTypeReturnError(descriptorName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("descriptorName" -> descriptorName))
+  }
+
+  def failedParsingDescriptorError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_PARSING_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_RECURSION_ERROR",
+      messageParameters = Map("fieldDescriptor" -> fieldDescriptor))
+  }
+
+  def protobufFieldTypeMismatchError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_TYPE_MISMATCH",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufClassLoadError(
+      protobufClassName: String,
+      hasDots: Boolean,
+      cause: Throwable): Throwable = {
+    val message = if (hasDots) "" else ". Ensure the class name includes package prefix."
+    new AnalysisException(
+      errorClass = "PROTOBUF_CLASS_LOAD_ERROR",
+      messageParameters = Map("protobufClassName" -> protobufClassName, "message" -> message),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufMessageTypeError(protobufClassName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("protobufClassName" -> protobufClassName))
+  }
+
+  def protobufDescriptorDependencyError(dependencyName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DEPENDENCY_ERROR",
+      messageParameters = Map("dependencyName" -> dependencyName))
+  }
+
+  def invalidByteStringFormatError(): Throwable = {
+    new AnalysisException(errorClass = "INVALID_BYTE_STRING_ERROR", messageParameters = Map.empty)
+  }
+
+  def malformedRecordsDetectedInRecordParsingError(cause: Throwable): Throwable = {

Review Comment:
   Is it a compilation error not runtime (execution) one?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Some(cause.getCause))
+  }
+
+  def noProtobufMessageTypeReturnError(descriptorName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("descriptorName" -> descriptorName))
+  }
+
+  def failedParsingDescriptorError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_PARSING_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))

Review Comment:
   Some -> Option?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))

Review Comment:
   Some -> Option



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala:
##########
@@ -215,10 +217,12 @@ private[sql] class ProtobufSerializer(
           duration.build()
 
       case _ =>
-        throw new IncompatibleSchemaException(
-          errorPrefix +
-            s"schema is incompatible (sqlType = ${catalystType.sql}, " +
-            s"protoType = ${fieldDescriptor.getJavaType})")
+        throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufTypeError(
+          toFieldStr(catalystPath),

Review Comment:
   Please, don't quote it twice if you do that inside of `cannotConvertCatalystTypeToProtobufTypeError()` by `toSQLId`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,

Review Comment:
   If it is a column id, wrap it by `toSQLId()`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))

Review Comment:
   Some -> Option



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))

Review Comment:
   Some -> Option



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Some(cause.getCause))
+  }
+
+  def noProtobufMessageTypeReturnError(descriptorName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("descriptorName" -> descriptorName))
+  }
+
+  def failedParsingDescriptorError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_PARSING_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_RECURSION_ERROR",
+      messageParameters = Map("fieldDescriptor" -> fieldDescriptor))
+  }
+
+  def protobufFieldTypeMismatchError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_TYPE_MISMATCH",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufClassLoadError(
+      protobufClassName: String,
+      hasDots: Boolean,
+      cause: Throwable): Throwable = {
+    val message = if (hasDots) "" else ". Ensure the class name includes package prefix."
+    new AnalysisException(
+      errorClass = "PROTOBUF_CLASS_LOAD_ERROR",
+      messageParameters = Map("protobufClassName" -> protobufClassName, "message" -> message),
+      cause = Some(cause.getCause))

Review Comment:
   `getCause()` can return `null`, correct. If so, could you wrap it by `Option(cause.getCause)`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Some(cause.getCause))

Review Comment:
   Some -> Option



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008228857


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Some(cause.getCause))
+  }
+
+  def noProtobufMessageTypeReturnError(descriptorName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("descriptorName" -> descriptorName))
+  }
+
+  def failedParsingDescriptorError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_PARSING_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_RECURSION_ERROR",
+      messageParameters = Map("fieldDescriptor" -> fieldDescriptor))
+  }
+
+  def protobufFieldTypeMismatchError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_TYPE_MISMATCH",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufClassLoadError(
+      protobufClassName: String,
+      hasDots: Boolean,
+      cause: Throwable): Throwable = {
+    val message = if (hasDots) "" else ". Ensure the class name includes package prefix."
+    new AnalysisException(
+      errorClass = "PROTOBUF_CLASS_LOAD_ERROR",
+      messageParameters = Map("protobufClassName" -> protobufClassName, "message" -> message),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufMessageTypeError(protobufClassName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("protobufClassName" -> protobufClassName))
+  }
+
+  def protobufDescriptorDependencyError(dependencyName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DEPENDENCY_ERROR",
+      messageParameters = Map("dependencyName" -> dependencyName))
+  }
+
+  def invalidByteStringFormatError(): Throwable = {
+    new AnalysisException(errorClass = "INVALID_BYTE_STRING_ERROR", messageParameters = Map.empty)
+  }
+
+  def malformedRecordsDetectedInRecordParsingError(cause: Throwable): Throwable = {

Review Comment:
   it is runtime, moved to ExecutionError class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
MaxGekk commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1014644624


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3212,4 +3212,179 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: Seq[String],
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: Seq[String],
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_FIELD_TYPE",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: Seq[String],
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_ENUM_TYPE",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Option(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Option(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_MESSAGE_NOT_FOUND",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(descFilePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR",
+      messageParameters = Map.empty("descFilePath" -> descFilePath),
+      cause = Option(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Option(cause.getCause))
+  }
+
+  def failedParsingDescriptorError(descFilePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
+      messageParameters = Map.empty("descFilePath" -> descFilePath),

Review Comment:
   @SandishKumarHN Please, fix this. Map.empty -> Map



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3212,4 +3212,179 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: Seq[String],
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: Seq[String],
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_FIELD_TYPE",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: Seq[String],
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_ENUM_TYPE",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Option(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Option(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_MESSAGE_NOT_FOUND",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(descFilePath: String, cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR",
+      messageParameters = Map.empty("descFilePath" -> descFilePath),

Review Comment:
   Accidentally found this. Is `messageParameters` set to an empty map, if so, this will fail in runtime w/ an internal error. Could you write a test which trigger the error and fix the bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][SPARK-PROTOBUF] : Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1002318758


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -62,14 +63,16 @@ object SchemaConverters {
       case STRING => Some(StringType)
       case BYTE_STRING => Some(BinaryType)
       case ENUM => Some(StringType)
-      case MESSAGE if fd.getMessageType.getName == "Duration" =>
+      case MESSAGE if fd.getMessageType.getName == "Duration" &&

Review Comment:
   @rangadi My assumption is that users should be able to use the Timestamp and Duration message types with different fields. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,27 +195,32 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
 
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
+    val descriptorProto: DescriptorProtos.FileDescriptorProto =

Review Comment:
   @rangadi This handles the protobuf import; please let me know if you know of a better approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
MaxGekk commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1002880566


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,183 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+    protobufColumn: String,
+    sqlColumn: String,
+    protobufType: String,
+    sqlType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_2251",

Review Comment:
   We use the prefix `_LEGACY_ERROR_TEMP_` to convert existing exception to error classes but for new exception, please, assign appropriate names to error classes. 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala:
##########
@@ -244,7 +239,13 @@ private[sql] class ProtobufDeserializer(
       case (ENUM, StringType) =>
         (updater, ordinal, value) => updater.set(ordinal, UTF8String.fromString(value.toString))
 
-      case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+      case _ =>
+        throw QueryCompilationErrors.cannotConvertProtobufTypeToSqlTypeError(
+          toFieldStr(protoPath),
+          toFieldStr(catalystPath),
+          s"${protoType} ${protoType.toProto.getLabel} ${protoType.getJavaType}" +
+            s" ${protoType.getType}",
+          catalystType.sql)

Review Comment:
   Pass `catalystType` into `cannotConvertProtobufTypeToSqlTypeError()`, and use `toSQLType` to quote the type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1002893438


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   Could you add comment here?
   Is this for protos like `Timestamp' ? Looks like our code handles any Timestamp defined similar com.google.protobuf.Timestamp. Ok to not to include these. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003619801


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   I see. What would fail if you remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1003664791


##########
connector/protobuf/pom.xml:
##########
@@ -123,6 +123,7 @@
             <configuration>
               <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
               <protocVersion>${protobuf.version}</protocVersion>
+              <includeMavenTypes>direct</includeMavenTypes>

Review Comment:
   Yep. That would be nice. That will mimic normal proto files organization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006187850


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)

Review Comment:
   @rangadi I removed this option on purpose; since the user would pass the main descriptor file as a parameter, we should just look at the main descriptor. What if the user has the same MessageName in multiple .proto files? Which one should I read?
   
   the descriptor structure is first imports, then message definition, with this assumption "last" would be the main message definition. 
   
   how about looking at the main message definition first, if don't find we can look at the "imports" return as soon as the messagename matches.?     
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006324273


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -175,19 +175,26 @@ private[sql] object ProtobufUtils extends Logging {
       .asInstanceOf[Descriptor]
   }
 
+  // TODO: Revisit to ensure that messageName is searched through all imports

Review Comment:
   What is missing? Looks fairly complete to me.
   Better to state the problem here.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -177,47 +175,75 @@ private[sql] object ProtobufUtils extends Logging {
       .asInstanceOf[Descriptor]
   }
 
+  // TODO: Revisit to ensure that messageName is searched through all imports
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = {
-    val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-      desc.getName == messageName || desc.getFullName == messageName
+    val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor => {
+      fileDescriptor.getMessageTypes.asScala.find { desc =>
+        desc.getName == messageName || desc.getFullName == messageName
+      }
+    }).filter(f => !f.isEmpty)
+
+    if (descriptorList.isEmpty) {
+      throw QueryCompilationErrors.noProtobufMessageTypeReturnError(messageName)
     }
 
-    descriptor match {
+    descriptorList.last match {
       case Some(d) => d
       case None =>
-        throw new RuntimeException(s"Unable to locate Message '$messageName' in Descriptor")
+        throw QueryCompilationErrors.unableToLocateProtobufMessageError(messageName)
     }
   }
 
-  private def parseFileDescriptor(descFilePath: String): Descriptors.FileDescriptor = {
+  private def parseFileDescriptor(descFilePath: String): List[Descriptors.FileDescriptor] = {

Review Comment:
   Rename to `parseFileDescriptorSet` (otherwise it sounds like it is parsing just one file descriptor). 



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -175,19 +175,26 @@ private[sql] object ProtobufUtils extends Logging {
       .asInstanceOf[Descriptor]
   }
 
+  // TODO: Revisit to ensure that messageName is searched through all imports
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = {
-    val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-      desc.getName == messageName || desc.getFullName == messageName
+    val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor => {
+      fileDescriptor.getMessageTypes.asScala.find { desc =>
+        desc.getName == messageName || desc.getFullName == messageName
+      }
+    }).filter(f => !f.isEmpty)
+
+    if (descriptorList.isEmpty) {
+      throw QueryCompilationErrors.noProtobufMessageTypeReturnError(messageName)
     }
 
-    descriptor match {
+    descriptorList.last match {

Review Comment:
   Could you add a comment on why we are picking the last one? Will be useful for future readers as well.



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -177,47 +175,75 @@ private[sql] object ProtobufUtils extends Logging {
       .asInstanceOf[Descriptor]
   }
 
+  // TODO: Revisit to ensure that messageName is searched through all imports
   def buildDescriptor(descFilePath: String, messageName: String): Descriptor = {
-    val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
-      desc.getName == messageName || desc.getFullName == messageName
+    val descriptorList = parseFileDescriptor(descFilePath).map(fileDescriptor => {

Review Comment:
   Style: use `find()` rather than map().filter(). 
   
   (you can use `findLast()` if there is a reason to use the last match). 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006195517


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
     }
   }
 
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+    val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+      fileDescriptorProtoMap.get(dependency) match {
+        case Some(dependencyProto) =>
+          buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+        case None =>
+          throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+      }
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors to protos.
+   */
+  private def createDescriptorProtoMap(
+    fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+    fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+      descriptorProto.getName() -> descriptorProto

Review Comment:
   The command is using only single proto file. Not the same as the example above, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1004099495


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##########
@@ -62,14 +63,18 @@ object SchemaConverters {
       case STRING => Some(StringType)
       case BYTE_STRING => Some(BinaryType)
       case ENUM => Some(StringType)
-      case MESSAGE if fd.getMessageType.getName == "Duration" =>
+      case MESSAGE
+        if fd.getMessageType.getName == "Duration" &&
+        fd.getMessageType.getFields.size() == 2 &&
+        fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
+        fd.getMessageType.getFields.get(1).getName.equals("nanos") =>
         Some(DayTimeIntervalType.defaultConcreteType)
-      case MESSAGE if fd.getMessageType.getName == "Timestamp" =>
-        Some(TimestampType)
-        // FIXME: Is the above accurate? Users can have protos named "Timestamp" but are not
-        //        expected to be TimestampType in Spark. How about verifying fields?
-        //        Same for "Duration". Only the Timestamp & Duration protos defined in
-        //        google.protobuf package should default to corresponding Catalylist types.
+      case MESSAGE
+        if fd.getMessageType.getName == "Timestamp" &&
+        fd.getMessageType.getFields.size() == 2 &&

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006150396


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)

Review Comment:
   There is one more related issue here: We can find messages defined in this file, but not the message defined inside the imported file. 
   
   I think it is better to fix all of this properly, since you are already working on this. 
   I.e. we should be able to use any message defined in the descriptor file. 
   Essentially we can build `List [ FileDescriptor ]` and search for user's message in all of those. We could give preference for `last` if in this search order.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006192940


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)

Review Comment:
   I think that is pretty restrictive. There will be cases where user wants to use the protobuf defined in imported files. E.g. they might have one proto file just to import bunch of other file that define actual protos used.
   
   What are we gaining by having restriction to allow messages only in the last listed file? In addition there could be be multiple top level files (i.e. `protoc` command is run with multiple files).
   
   I think fixing it properly would be the right thing. We can leave a TODO here, I can add that in a follow up. LMK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006187850


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)

Review Comment:
   @rangadi I removed this option on purpose; since the user would pass the main descriptor file as a parameter, we should just look at the main descriptor. What if the user has the same MessageName in multiple .proto files? Which one should we read?
   
   the descriptor structure is first imports, then message definition, with this assumption "last" would be the main message definition. 
   
   how about looking at the main message definition first, if don't find we can look at the "imports" return as soon as the messagename matches.?     
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006135653


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)

Review Comment:
   Could you add comment about why `last` is used here? What happens if we use `first`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006211573


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
     }
   }
 
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+    val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+      fileDescriptorProtoMap.get(dependency) match {
+        case Some(dependencyProto) =>
+          buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+        case None =>
+          throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+      }
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors to protos.
+   */
+  private def createDescriptorProtoMap(
+    fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+    fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+      descriptorProto.getName() -> descriptorProto

Review Comment:
   @rangadi Yes, protoc allowed two .proto files from different paths, but the last .proto file overrides the first one if there are any conflicts. but It is entirely up to the user to determine how to create a .desc file right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006277470


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)

Review Comment:
   @rangadi I just added a basic working method for searching messageName through all imports. and left a TODO to revisit. and also made changes to unit tests(ProtobufFunctionsSuite) to use a single .desc file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008228200


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala:
##########
@@ -215,10 +217,12 @@ private[sql] class ProtobufSerializer(
           duration.build()
 
       case _ =>
-        throw new IncompatibleSchemaException(
-          errorPrefix +
-            s"schema is incompatible (sqlType = ${catalystType.sql}, " +
-            s"protoType = ${fieldDescriptor.getJavaType})")
+        throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufTypeError(
+          toFieldStr(catalystPath),

Review Comment:
   okay, will call toSQLType inside the error class, avoid using toFieldStr for sqlType



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008417312


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {

Review Comment:
   this is thrown at protobuf message level



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {
+    "message" : [
+      "Unable to convert <protobufType> of Protobuf to SQL type <toType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR" : {

Review Comment:
   this is thrown at protobuf field level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008417281


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {
+    "message" : [
+      "Unable to convert <protobufType> of Protobuf to SQL type <toType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR" : {

Review Comment:
   this gives which protobuf field had failed to convert



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {

Review Comment:
   this gives which protobuf message had failed to convert 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008417312


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -706,6 +736,66 @@
     ],
     "sqlState" : "42000"
   },
+  "PROTOBUF_CLASS_LOAD_ERROR" : {
+    "message" : [
+      "Could not load Protobuf class with name <protobufClassName><message>"
+    ]
+  },
+  "PROTOBUF_DEPENDENCY_ERROR" : {
+    "message" : [
+      "Could not find dependency: <dependencyName>"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_ERROR" : {
+    "message" : [
+      "Error parsing descriptor byte[] into Descriptor object"
+    ]
+  },
+  "PROTOBUF_DESCRIPTOR_PARSING_ERROR" : {
+    "message" : [
+      "Error constructing FileDescriptor"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_ERROR" : {
+    "message" : [
+      "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+    ]
+  },
+  "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA" : {
+    "message" : [
+      "Found <field> in Protobuf schema but there is no match in the SQL schema"
+    ]
+  },
+  "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+    "message" : [
+      "Type mismatch encountered for field: <field>"
+    ]
+  },
+  "PROTOBUF_MESSAGE_TYPE_ERROR" : {
+    "message" : [
+      "<protobufClassName> is not a Protobuf message type"
+    ]
+  },
+  "PROTOBUF_RECURSION_ERROR" : {
+    "message" : [
+      "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+    ]
+  },
+  "PROTOBUF_TYPE_NOT_SUPPORT_ERROR" : {
+    "message" : [
+      "Protobuf type not yet supported: <protobufType>."
+    ]
+  },
+  "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR" : {

Review Comment:
   this is gives which message had failed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38344: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes.

Posted by GitBox <gi...@apache.org>.
SandishKumarHN commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1006197896


##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala:
##########
@@ -196,30 +194,52 @@ private[sql] object ProtobufUtils extends Logging {
       fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
     } catch {
       case ex: InvalidProtocolBufferException =>
-        // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
-        throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+        throw QueryCompilationErrors.descrioptorParseError(ex)
       case ex: IOException =>
-        throw new RuntimeException(
-          "Error reading Protobuf descriptor file at path: " +
-            descFilePath,
-          ex)
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
     }
-
-    val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
     try {
-      val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
-        descriptorProto,
-        new Array[Descriptors.FileDescriptor](0))
+      val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+      val fileDescriptor: Descriptors.FileDescriptor =
+        buildFileDescriptor(fileDescriptorSet.getFileList.asScala.last, fileDescriptorProtoIndex)
       if (fileDescriptor.getMessageTypes().isEmpty()) {
-        throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
+        throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
       }
       fileDescriptor
     } catch {
       case e: Descriptors.DescriptorValidationException =>
-        throw new RuntimeException("Error constructing FileDescriptor", e)
+        throw QueryCompilationErrors.failedParsingDescriptorError(e)
     }
   }
 
+  /**
+   * Recursively constructs file descriptors for all dependencies for given
+   * FileDescriptorProto and return.
+   */
+  private def buildFileDescriptor(
+    fileDescriptorProto: FileDescriptorProto,
+    fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+    val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+      fileDescriptorProtoMap.get(dependency) match {
+        case Some(dependencyProto) =>
+          buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+        case None =>
+          throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+      }
+    }
+    Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray)
+  }
+
+  /**
+   * Returns a map from descriptor proto name as found inside the descriptors to protos.
+   */
+  private def createDescriptorProtoMap(
+    fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+    fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+      descriptorProto.getName() -> descriptorProto

Review Comment:
   @rangadi yeah correct but functions_suite.proto is including both basicmessage.proto and basicmessage_1.proto, wouldn't it equal the same as above? 
   ```
   import "timestamp.proto";
   import "duration.proto";
   import "basicmessage.proto";
   import "basicmessage_1.proto";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org