You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/11/04 05:50:29 UTC
[spark] branch master updated: [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5741d38ee27 [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes
5741d38ee27 is described below
commit 5741d38ee272418a919fe7d102514c221a6e741a
Author: SandishKumarHN <sa...@gmail.com>
AuthorDate: Fri Nov 4 14:50:12 2022 +0900
[SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes
This is the follow-up PR to https://github.com/apache/spark/pull/37972 and https://github.com/apache/spark/pull/38212
### What changes were proposed in this pull request?
1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json).
2. Support protobuf imports
3. validate protobuf timestamp and duration types.
### Why are the changes needed?
N/A
### Does this PR introduce _any_ user-facing change?
None
### How was this patch tested?
Existing tests should cover the validation of this PR.
CC: rangadi mposdev21 gengliangwang
Closes #38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls.
Authored-by: SandishKumarHN <sa...@gmail.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../sql/protobuf/ProtobufDataToCatalyst.scala | 22 +--
.../spark/sql/protobuf/ProtobufDeserializer.scala | 31 ++--
.../spark/sql/protobuf/ProtobufSerializer.scala | 38 +++--
.../spark/sql/protobuf/utils/ProtobufUtils.scala | 117 ++++++++------
.../sql/protobuf/utils/SchemaConverters.scala | 32 ++--
.../src/test/resources/protobuf/basicmessage.proto | 39 +++++
.../test/resources/protobuf/catalyst_types.proto | 2 -
.../src/test/resources/protobuf/duration.proto | 26 +++
.../test/resources/protobuf/functions_suite.desc | Bin 5958 -> 6678 bytes
.../test/resources/protobuf/functions_suite.proto | 35 ++--
.../src/test/resources/protobuf/nestedenum.proto | 28 ++++
.../src/test/resources/protobuf/timestamp.proto | 26 +++
.../ProtobufCatalystDataConversionSuite.scala | 2 +-
.../sql/protobuf/ProtobufFunctionsSuite.scala | 35 ++--
.../spark/sql/protobuf/ProtobufSerdeSuite.scala | 85 +++++++---
core/src/main/resources/error/error-classes.json | 107 ++++++++++++-
project/SparkBuild.scala | 2 +-
.../spark/sql/errors/QueryCompilationErrors.scala | 177 ++++++++++++++++++++-
.../spark/sql/errors/QueryExecutionErrors.scala | 8 +
.../sql/errors/QueryCompilationErrorsSuite.scala | 4 +-
20 files changed, 625 insertions(+), 191 deletions(-)
diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
index cad2442f10c..c0997b1bd06 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
@@ -21,11 +21,10 @@ import scala.util.control.NonFatal
import com.google.protobuf.DynamicMessage
-import org.apache.spark.SparkException
-import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters}
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType}
@@ -71,16 +70,11 @@ private[protobuf] case class ProtobufDataToCatalyst(
@transient private lazy val parseMode: ParseMode = {
val mode = protobufOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {
- throw new AnalysisException(unacceptableModeMessage(mode.name))
+ throw QueryCompilationErrors.parseModeUnsupportedError(prettyName, mode)
}
mode
}
- private def unacceptableModeMessage(name: String): String = {
- s"from_protobuf() doesn't support the $name mode. " +
- s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
- }
-
@transient private lazy val nullResultRow: Any = dataType match {
case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
@@ -98,13 +92,9 @@ private[protobuf] case class ProtobufDataToCatalyst(
case PermissiveMode =>
nullResultRow
case FailFastMode =>
- throw new SparkException(
- "Malformed records are detected in record parsing. " +
- s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " +
- "result, try setting the option 'mode' as 'PERMISSIVE'.",
- e)
+ throw QueryExecutionErrors.malformedProtobufMessageDetectedInMessageParsingError(e)
case _ =>
- throw new AnalysisException(unacceptableModeMessage(parseMode.name))
+ throw QueryCompilationErrors.parseModeUnsupportedError(prettyName, parseMode)
}
}
@@ -119,8 +109,8 @@ private[protobuf] case class ProtobufDataToCatalyst(
case Some(number) =>
// Unknown fields contain a field with same number as a known field. Must be due to
// mismatch of schema between writer and reader here.
- throw new IllegalArgumentException(s"Type mismatch encountered for field:" +
- s" ${messageDescriptor.getFields.get(number)}")
+ throw QueryCompilationErrors.protobufFieldTypeMismatchError(
+ messageDescriptor.getFields.get(number).toString)
case None =>
}
diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
index 0403b741ebf..46366ba268b 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
@@ -22,6 +22,7 @@ import com.google.protobuf.{ByteString, DynamicMessage, Message}
import com.google.protobuf.Descriptors._
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
@@ -29,7 +30,6 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoMatchedField
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.toFieldStr
-import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -61,10 +61,10 @@ private[sql] class ProtobufDeserializer(
}
}
} catch {
- case ise: IncompatibleSchemaException =>
- throw new IncompatibleSchemaException(
- s"Cannot convert Protobuf type ${rootDescriptor.getName} " +
- s"to SQL type ${rootCatalystType.sql}.",
+ case ise: AnalysisException =>
+ throw QueryCompilationErrors.cannotConvertProtobufTypeToCatalystTypeError(
+ rootDescriptor.getName,
+ rootCatalystType,
ise)
}
@@ -152,11 +152,6 @@ private[sql] class ProtobufDeserializer(
catalystType: DataType,
protoPath: Seq[String],
catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = {
- val errorPrefix = s"Cannot convert Protobuf ${toFieldStr(protoPath)} to " +
- s"SQL ${toFieldStr(catalystPath)} because "
- val incompatibleMsg = errorPrefix +
- s"schema is incompatible (protoType = ${protoType} ${protoType.toProto.getLabel} " +
- s"${protoType.getJavaType} ${protoType.getType}, sqlType = ${catalystType.sql})"
(protoType.getJavaType, catalystType) match {
@@ -175,8 +170,9 @@ private[sql] class ProtobufDeserializer(
case (INT, ShortType) =>
(updater, ordinal, value) => updater.setShort(ordinal, value.asInstanceOf[Short])
- case (BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
- ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
+ case (
+ BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
+ ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
newArrayWriter(protoType, protoPath, catalystPath, dataType, containsNull)
case (LONG, LongType) =>
@@ -199,7 +195,8 @@ private[sql] class ProtobufDeserializer(
(updater, ordinal, value) =>
val byte_array = value match {
case s: ByteString => s.toByteArray
- case _ => throw new Exception("Invalid ByteString format")
+ case unsupported =>
+ throw QueryCompilationErrors.invalidByteStringFormatError(unsupported)
}
updater.set(ordinal, byte_array)
@@ -244,7 +241,13 @@ private[sql] class ProtobufDeserializer(
case (ENUM, StringType) =>
(updater, ordinal, value) => updater.set(ordinal, UTF8String.fromString(value.toString))
- case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+ case _ =>
+ throw QueryCompilationErrors.cannotConvertProtobufTypeToSqlTypeError(
+ toFieldStr(protoPath),
+ catalystPath,
+ s"${protoType} ${protoType.toProto.getLabel} ${protoType.getJavaType}" +
+ s" ${protoType.getType}",
+ catalystType)
}
}
diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala
index 5d9af92c5c0..0f87c640b19 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala
@@ -23,13 +23,14 @@ import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.{toFieldStr, ProtoMatchedField}
-import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
import org.apache.spark.sql.types._
/**
@@ -53,10 +54,10 @@ private[sql] class ProtobufSerializer(
newStructConverter(st, rootDescriptor, Nil, Nil).asInstanceOf[Any => Any]
}
} catch {
- case ise: IncompatibleSchemaException =>
- throw new IncompatibleSchemaException(
- s"Cannot convert SQL type ${rootCatalystType.sql} to Protobuf type " +
- s"${rootDescriptor.getName}.",
+ case ise: AnalysisException =>
+ throw QueryCompilationErrors.cannotConvertSqlTypeToProtobufError(
+ rootDescriptor.getName,
+ rootCatalystType,
ise)
}
if (nullable) { (data: Any) =>
@@ -77,8 +78,6 @@ private[sql] class ProtobufSerializer(
fieldDescriptor: FieldDescriptor,
catalystPath: Seq[String],
protoPath: Seq[String]): Converter = {
- val errorPrefix = s"Cannot convert SQL ${toFieldStr(catalystPath)} " +
- s"to Protobuf ${toFieldStr(protoPath)} because "
(catalystType, fieldDescriptor.getJavaType) match {
case (NullType, _) =>
(getter, ordinal) => null
@@ -104,10 +103,11 @@ private[sql] class ProtobufSerializer(
(getter, ordinal) =>
val data = getter.getUTF8String(ordinal).toString
if (!enumSymbols.contains(data)) {
- throw new IncompatibleSchemaException(
- errorPrefix +
- s""""$data" cannot be written since it's not defined in enum """ +
- enumSymbols.mkString("\"", "\", \"", "\""))
+ throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufEnumTypeError(
+ catalystPath,
+ toFieldStr(protoPath),
+ data,
+ enumSymbols.mkString("\"", "\", \"", "\""))
}
fieldDescriptor.getEnumType.findValueByName(data)
case (StringType, STRING) =>
@@ -124,7 +124,8 @@ private[sql] class ProtobufSerializer(
case (TimestampType, MESSAGE) =>
(getter, ordinal) =>
val millis = DateTimeUtils.microsToMillis(getter.getLong(ordinal))
- Timestamp.newBuilder()
+ Timestamp
+ .newBuilder()
.setSeconds((millis / 1000))
.setNanos(((millis % 1000) * 1000000).toInt)
.build()
@@ -201,7 +202,8 @@ private[sql] class ProtobufSerializer(
val calendarInterval = IntervalUtils.fromIntervalString(dayTimeIntervalString)
val millis = DateTimeUtils.microsToMillis(calendarInterval.microseconds)
- val duration = Duration.newBuilder()
+ val duration = Duration
+ .newBuilder()
.setSeconds((millis / 1000))
.setNanos(((millis % 1000) * 1000000).toInt)
@@ -215,10 +217,12 @@ private[sql] class ProtobufSerializer(
duration.build()
case _ =>
- throw new IncompatibleSchemaException(
- errorPrefix +
- s"schema is incompatible (sqlType = ${catalystType.sql}, " +
- s"protoType = ${fieldDescriptor.getJavaType})")
+ throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufTypeError(
+ catalystPath,
+ toFieldStr(protoPath),
+ catalystType,
+ s"${fieldDescriptor} ${fieldDescriptor.toProto.getLabel} ${fieldDescriptor.getJavaType}" +
+ s" ${fieldDescriptor.getType}")
}
}
diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
index fa2ec9b7cd4..4bd59ddce6c 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
@@ -23,11 +23,12 @@ import java.util.Locale
import scala.collection.JavaConverters._
import com.google.protobuf.{DescriptorProtos, Descriptors, InvalidProtocolBufferException, Message}
+import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, FileDescriptorSet}
import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -61,9 +62,9 @@ private[sql] object ProtobufUtils extends Logging {
protoPath: Seq[String],
catalystPath: Seq[String]) {
if (descriptor.getName == null) {
- throw new IncompatibleSchemaException(
- s"Attempting to treat ${descriptor.getName} as a RECORD, " +
- s"but it was: ${descriptor.getContainingType}")
+ throw QueryCompilationErrors.unknownProtobufMessageTypeError(
+ descriptor.getName,
+ descriptor.getContainingType().getName)
}
private[this] val protoFieldArray = descriptor.getFields.asScala.toArray
@@ -79,30 +80,29 @@ private[sql] object ProtobufUtils extends Logging {
/**
* Validate that there are no Catalyst fields which don't have a matching Protobuf field,
- * throwing [[IncompatibleSchemaException]] if such extra fields are found. If
- * `ignoreNullable` is false, consider nullable Catalyst fields to be eligible to be an extra
- * field; otherwise, ignore nullable Catalyst fields when checking for extras.
+ * throwing [[AnalysisException]] if such extra fields are found. If `ignoreNullable` is
+ * false, consider nullable Catalyst fields to be eligible to be an extra field; otherwise,
+ * ignore nullable Catalyst fields when checking for extras.
*/
def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit =
catalystSchema.fields.foreach { sqlField =>
if (getFieldByName(sqlField.name).isEmpty &&
(!ignoreNullable || !sqlField.nullable)) {
- throw new IncompatibleSchemaException(
- s"Cannot find ${toFieldStr(catalystPath :+ sqlField.name)} in Protobuf schema")
+ throw QueryCompilationErrors.cannotFindCatalystTypeInProtobufSchemaError(
+ toFieldStr(catalystPath :+ sqlField.name))
}
}
/**
* Validate that there are no Protobuf fields which don't have a matching Catalyst field,
- * throwing [[IncompatibleSchemaException]] if such extra fields are found. Only required
- * (non-nullable) fields are checked; nullable fields are ignored.
+ * throwing [[AnalysisException]] if such extra fields are found. Only required (non-nullable)
+ * fields are checked; nullable fields are ignored.
*/
def validateNoExtraRequiredProtoFields(): Unit = {
val extraFields = protoFieldArray.toSet -- matchedFields.map(_.fieldDescriptor)
extraFields.filterNot(isNullable).foreach { extraField =>
- throw new IncompatibleSchemaException(
- s"Found ${toFieldStr(protoPath :+ extraField.getName())} in Protobuf schema " +
- "but there is no match in the SQL schema")
+ throw QueryCompilationErrors.cannotFindProtobufFieldInCatalystError(
+ toFieldStr(protoPath :+ extraField.getName()))
}
}
@@ -125,10 +125,11 @@ private[sql] object ProtobufUtils extends Logging {
case Seq(protoField) => Some(protoField)
case Seq() => None
case matches =>
- throw new IncompatibleSchemaException(
- s"Searching for '$name' in " +
- s"Protobuf schema at ${toFieldStr(protoPath)} gave ${matches.size} matches. " +
- s"Candidates: " + matches.map(_.getName()).mkString("[", ", ", "]"))
+ throw QueryCompilationErrors.protobufFieldMatchError(
+ name,
+ toFieldStr(protoPath),
+ s"${matches.size}",
+ matches.map(_.getName()).mkString("[", ", ", "]"))
}
}
}
@@ -157,16 +158,12 @@ private[sql] object ProtobufUtils extends Logging {
val protobufClass = try {
Utils.classForName(protobufClassName)
} catch {
- case _: ClassNotFoundException =>
- val hasDots = protobufClassName.contains(".")
- throw new IllegalArgumentException(
- s"Could not load Protobuf class with name '$protobufClassName'" +
- (if (hasDots) "" else ". Ensure the class name includes package prefix.")
- )
+ case e: ClassNotFoundException =>
+ throw QueryCompilationErrors.protobufClassLoadError(protobufClassName, e)
}
if (!classOf[Message].isAssignableFrom(protobufClass)) {
- throw new IllegalArgumentException(s"$protobufClassName is not a Protobuf message type")
+ throw QueryCompilationErrors.protobufMessageTypeError(protobufClassName)
// TODO: Need to support V2. This might work with V2 classes too.
}
@@ -178,46 +175,70 @@ private[sql] object ProtobufUtils extends Logging {
}
def buildDescriptor(descFilePath: String, messageName: String): Descriptor = {
- val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc =>
- desc.getName == messageName || desc.getFullName == messageName
- }
+ // Find the first message descriptor that matches the name.
+ val descriptorOpt = parseFileDescriptorSet(descFilePath)
+ .flatMap { fileDesc =>
+ fileDesc.getMessageTypes.asScala.find { desc =>
+ desc.getName == messageName || desc.getFullName == messageName
+ }
+ }.headOption
- descriptor match {
+ descriptorOpt match {
case Some(d) => d
- case None =>
- throw new RuntimeException(s"Unable to locate Message '$messageName' in Descriptor")
+ case None => throw QueryCompilationErrors.unableToLocateProtobufMessageError(messageName)
}
}
- private def parseFileDescriptor(descFilePath: String): Descriptors.FileDescriptor = {
+ private def parseFileDescriptorSet(descFilePath: String): List[Descriptors.FileDescriptor] = {
var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null
try {
val dscFile = new BufferedInputStream(new FileInputStream(descFilePath))
fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
} catch {
case ex: InvalidProtocolBufferException =>
- // TODO move all the exceptions to core/src/main/resources/error/error-classes.json
- throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex)
+ throw QueryCompilationErrors.descrioptorParseError(descFilePath, ex)
case ex: IOException =>
- throw new RuntimeException(
- "Error reading Protobuf descriptor file at path: " +
- descFilePath,
- ex)
+ throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
}
-
- val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0)
try {
- val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
- descriptorProto,
- new Array[Descriptors.FileDescriptor](0))
- if (fileDescriptor.getMessageTypes().isEmpty()) {
- throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName());
- }
- fileDescriptor
+ val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+ val fileDescriptorList: List[Descriptors.FileDescriptor] =
+ fileDescriptorSet.getFileList.asScala.map( fileDescriptorProto =>
+ buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex)
+ ).toList
+ fileDescriptorList
} catch {
case e: Descriptors.DescriptorValidationException =>
- throw new RuntimeException("Error constructing FileDescriptor", e)
+ throw QueryCompilationErrors.failedParsingDescriptorError(descFilePath, e)
+ }
+ }
+
+ /**
+ * Recursively constructs file descriptors for all dependencies for given
+ * FileDescriptorProto and return.
+ */
+ private def buildFileDescriptor(
+ fileDescriptorProto: FileDescriptorProto,
+ fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = {
+ val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency =>
+ fileDescriptorProtoMap.get(dependency) match {
+ case Some(dependencyProto) =>
+ buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
+ case None =>
+ throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
+ }
}
+ Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray)
+ }
+
+ /**
+ * Returns a map from descriptor proto name as found inside the descriptors to protos.
+ */
+ private def createDescriptorProtoMap(
+ fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = {
+ fileDescriptorSet.getFileList().asScala.map { descriptorProto =>
+ descriptorProto.getName() -> descriptorProto
+ }.toMap[String, FileDescriptorProto]
}
/**
diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
index 4fca06fb5d8..6fcba3b8918 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.protobuf.ScalaReflectionLock
import org.apache.spark.sql.types._
@@ -62,14 +63,18 @@ object SchemaConverters {
case STRING => Some(StringType)
case BYTE_STRING => Some(BinaryType)
case ENUM => Some(StringType)
- case MESSAGE if fd.getMessageType.getName == "Duration" =>
+ case MESSAGE
+ if (fd.getMessageType.getName == "Duration" &&
+ fd.getMessageType.getFields.size() == 2 &&
+ fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
+ fd.getMessageType.getFields.get(1).getName.equals("nanos")) =>
Some(DayTimeIntervalType.defaultConcreteType)
- case MESSAGE if fd.getMessageType.getName == "Timestamp" =>
- Some(TimestampType)
- // FIXME: Is the above accurate? Users can have protos named "Timestamp" but are not
- // expected to be TimestampType in Spark. How about verifying fields?
- // Same for "Duration". Only the Timestamp & Duration protos defined in
- // google.protobuf package should default to corresponding Catalylist types.
+ case MESSAGE
+ if (fd.getMessageType.getName == "Timestamp" &&
+ fd.getMessageType.getFields.size() == 2 &&
+ fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
+ fd.getMessageType.getFields.get(1).getName.equals("nanos")) =>
+ Some(TimestampType)
case MESSAGE if fd.isRepeated && fd.getMessageType.getOptions.hasMapEntry =>
var keyType: DataType = NullType
var valueType: DataType = NullType
@@ -88,9 +93,7 @@ object SchemaConverters {
nullable = false))
case MESSAGE =>
if (existingRecordNames.contains(fd.getFullName)) {
- throw new IncompatibleSchemaException(s"""
- |Found recursive reference in Protobuf schema, which can not be processed by Spark:
- |${fd.toString()}""".stripMargin)
+ throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
}
val newRecordNames = existingRecordNames + fd.getFullName
@@ -100,10 +103,8 @@ object SchemaConverters {
.toSeq)
.filter(_.nonEmpty)
.map(StructType.apply)
- case _ =>
- throw new IncompatibleSchemaException(
- s"Cannot convert Protobuf type" +
- s" ${fd.getJavaType}")
+ case other =>
+ throw QueryCompilationErrors.protobufTypeUnsupportedYetError(other.toString)
}
dataType.map(dt =>
StructField(
@@ -111,7 +112,4 @@ object SchemaConverters {
if (fd.isRepeated) ArrayType(dt, containsNull = false) else dt,
nullable = !fd.isRequired && !fd.isRepeated))
}
-
- private[protobuf] class IncompatibleSchemaException(msg: String, ex: Throwable = null)
- extends Exception(msg, ex)
}
diff --git a/connector/protobuf/src/test/resources/protobuf/basicmessage.proto b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto
new file mode 100644
index 00000000000..4252f349cf0
--- /dev/null
+++ b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// cd connector/protobuf/src/test/resources/protobuf
+// protoc --java_out=./ basicmessage.proto
+// protoc --include_imports --descriptor_set_out=basicmessage.desc --java_out=org/apache/spark/sql/protobuf/ basicmessage.proto
+
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf.protos;
+
+import "nestedenum.proto";
+
+option java_outer_classname = "BasicMessageProto";
+
+message BasicMessage {
+ int64 id = 1;
+ string string_value = 2;
+ int32 int32_value = 3;
+ int64 int64_value = 4;
+ double double_value = 5;
+ float float_value = 6;
+ bool bool_value = 7;
+ bytes bytes_value = 8;
+ NestedEnum rnested_enum = 9;
+}
diff --git a/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto b/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto
index 1deb193438c..0732de10858 100644
--- a/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto
+++ b/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto
@@ -22,8 +22,6 @@ syntax = "proto3";
package org.apache.spark.sql.protobuf.protos;
option java_outer_classname = "CatalystTypes";
-// TODO: import one or more protobuf files.
-
message BooleanMsg {
bool bool_type = 1;
}
diff --git a/connector/protobuf/src/test/resources/protobuf/duration.proto b/connector/protobuf/src/test/resources/protobuf/duration.proto
new file mode 100644
index 00000000000..2e89a8db5b7
--- /dev/null
+++ b/connector/protobuf/src/test/resources/protobuf/duration.proto
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf.protos;
+
+option java_outer_classname = "DurationProto";
+
+message Duration {
+ int64 seconds = 1;
+ int32 nanos = 2;
+}
diff --git a/connector/protobuf/src/test/resources/protobuf/functions_suite.desc b/connector/protobuf/src/test/resources/protobuf/functions_suite.desc
index 6e3a3967277..d54ee4337a5 100644
Binary files a/connector/protobuf/src/test/resources/protobuf/functions_suite.desc and b/connector/protobuf/src/test/resources/protobuf/functions_suite.desc differ
diff --git a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
index 60f8c262141..2fef8495c5e 100644
--- a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
+++ b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
@@ -15,13 +15,18 @@
* limitations under the License.
*/
// To compile and create test class:
-// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/functions_suite.proto
-// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/functions_suite.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/functions_suite.proto
+// cd connector/protobuf/src/test/resources/protobuf
+// protoc --java_out=./ functions_suite.proto
+// protoc --include_imports --descriptor_set_out=functions_suite.desc --java_out=org/apache/spark/sql/protobuf/ functions_suite.proto
syntax = "proto3";
package org.apache.spark.sql.protobuf.protos;
+import "timestamp.proto";
+import "duration.proto";
+import "basicmessage.proto";
+
option java_outer_classname = "SimpleMessageProtos";
message SimpleMessageJavaTypes {
@@ -58,7 +63,7 @@ message SimpleMessageRepeated {
string key = 1;
string value = 2;
enum NestedEnum {
- ESTED_NOTHING = 0;
+ NESTED_NOTHING = 0;
NESTED_FIRST = 1;
NESTED_SECOND = 2;
}
@@ -72,17 +77,6 @@ message SimpleMessageRepeated {
repeated NestedEnum rnested_enum = 10;
}
-message BasicMessage {
- int64 id = 1;
- string string_value = 2;
- int32 int32_value = 3;
- int64 int64_value = 4;
- double double_value = 5;
- float float_value = 6;
- bool bool_value = 7;
- bytes bytes_value = 8;
-}
-
message RepeatedMessage {
repeated BasicMessage basic_message = 1;
}
@@ -119,7 +113,7 @@ message SimpleMessageEnum {
string key = 1;
string value = 2;
enum NestedEnum {
- ESTED_NOTHING = 0; // TODO: Fix the name.
+ NESTED_NOTHING = 0;
NESTED_FIRST = 1;
NESTED_SECOND = 2;
}
@@ -168,21 +162,10 @@ message requiredMsg {
int32 col_3 = 4;
}
-// https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/timestamp.proto
-message Timestamp {
- int64 seconds = 1;
- int32 nanos = 2;
-}
-
message timeStampMsg {
string key = 1;
Timestamp stmp = 2;
}
-// https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/duration.proto
-message Duration {
- int64 seconds = 1;
- int32 nanos = 2;
-}
message durationMsg {
string key = 1;
diff --git a/connector/protobuf/src/test/resources/protobuf/nestedenum.proto b/connector/protobuf/src/test/resources/protobuf/nestedenum.proto
new file mode 100644
index 00000000000..20e9005bec0
--- /dev/null
+++ b/connector/protobuf/src/test/resources/protobuf/nestedenum.proto
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf.protos;
+
+option java_outer_classname = "NestedEnumProto";
+
+enum NestedEnum {
+ NESTED_NOTHING = 0;
+ NESTED_FIRST = 1;
+ NESTED_SECOND = 2;
+}
diff --git a/connector/protobuf/src/test/resources/protobuf/timestamp.proto b/connector/protobuf/src/test/resources/protobuf/timestamp.proto
new file mode 100644
index 00000000000..7616cc2ccfc
--- /dev/null
+++ b/connector/protobuf/src/test/resources/protobuf/timestamp.proto
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf.protos;
+
+option java_outer_classname = "TimestampProto";
+
+message Timestamp {
+ int64 seconds = 1;
+ int32 nanos = 2;
+}
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index 19774a2ad07..271c5b0fec8 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -36,7 +36,7 @@ class ProtobufCatalystDataConversionSuite
with SharedSparkSession
with ExpressionEvalHelper {
- private val testFileDesc = testFile("protobuf/catalyst_types.desc").replace("file:/", "/")
+ private val testFileDesc = testFile("catalyst_types.desc").replace("file:/", "/")
private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.CatalystTypes$"
private def checkResultWithEval(
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
index 72280fb0d9e..199ef235f14 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
@@ -24,11 +24,11 @@ import scala.collection.JavaConverters._
import com.google.protobuf.{ByteString, DynamicMessage}
import org.apache.spark.sql.{Column, QueryTest, Row}
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.functions.{lit, struct}
import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated
import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated.NestedEnum
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
-import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DayTimeIntervalType, IntegerType, StringType, StructField, StructType, TimestampType}
@@ -36,7 +36,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
import testImplicits._
- val testFileDesc = testFile("protobuf/functions_suite.desc").replace("file:/", "/")
+ val testFileDesc = testFile("functions_suite.desc").replace("file:/", "/")
private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SimpleMessageProtos$"
/**
@@ -114,7 +114,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
.addRdoubleValue(1092093.654d)
.addRfloatValue(10903.0f)
.addRfloatValue(10902.0f)
- .addRnestedEnum(NestedEnum.ESTED_NOTHING)
+ .addRnestedEnum(NestedEnum.NESTED_NOTHING)
.addRnestedEnum(NestedEnum.NESTED_FIRST)
.build()
@@ -324,7 +324,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
.setField(messageEnumDesc.findFieldByName("value"), "value")
.setField(
messageEnumDesc.findFieldByName("nested_enum"),
- messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("ESTED_NOTHING"))
+ messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("NESTED_NOTHING"))
.setField(
messageEnumDesc.findFieldByName("nested_enum"),
messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("NESTED_FIRST"))
@@ -410,7 +410,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
checkWithFileAndClassName("recursiveB") {
case (name, descFilePathOpt) =>
- val e = intercept[IncompatibleSchemaException] {
+ val e = intercept[AnalysisException] {
df.select(
from_protobuf_wrapper($"messageB", name, descFilePathOpt).as("messageFromProto"))
.show()
@@ -446,7 +446,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
checkWithFileAndClassName("recursiveD") {
case (name, descFilePathOpt) =>
- val e = intercept[IncompatibleSchemaException] {
+ val e = intercept[AnalysisException] {
df.select(
from_protobuf_wrapper($"messageD", name, descFilePathOpt).as("messageFromProto"))
.show()
@@ -458,7 +458,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
}
test("Handle extra fields : oldProducer -> newConsumer") {
- val testFileDesc = testFile("protobuf/catalyst_types.desc").replace("file:/", "/")
+ val testFileDesc = testFile("catalyst_types.desc").replace("file:/", "/")
val oldProducer = ProtobufUtils.buildDescriptor(testFileDesc, "oldProducer")
val newConsumer = ProtobufUtils.buildDescriptor(testFileDesc, "newConsumer")
@@ -498,7 +498,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
}
test("Handle extra fields : newProducer -> oldConsumer") {
- val testFileDesc = testFile("protobuf/catalyst_types.desc").replace("file:/", "/")
+ val testFileDesc = testFile("catalyst_types.desc").replace("file:/", "/")
val newProducer = ProtobufUtils.buildDescriptor(testFileDesc, "newProducer")
val oldConsumer = ProtobufUtils.buildDescriptor(testFileDesc, "oldConsumer")
@@ -587,19 +587,16 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
val df = Seq(basicMessage.toByteArray).toDF("value")
- checkWithFileAndClassName("BasicMessage") {
- case (name, descFilePathOpt) =>
- val resultFrom = df
- .select(from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample)
- .where("sample.string_value == \"slam\"")
+ val resultFrom = df
+ .select(from_protobuf_wrapper($"value", "BasicMessage", Some(testFileDesc)) as 'sample)
+ .where("sample.string_value == \"slam\"")
- val resultToFrom = resultFrom
- .select(to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'value)
- .select(from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample)
- .where("sample.string_value == \"slam\"")
+ val resultToFrom = resultFrom
+ .select(to_protobuf_wrapper($"sample", "BasicMessage", Some(testFileDesc)) as 'value)
+ .select(from_protobuf_wrapper($"value", "BasicMessage", Some(testFileDesc)) as 'sample)
+ .where("sample.string_value == \"slam\"")
- assert(resultFrom.except(resultToFrom).isEmpty)
- }
+ assert(resultFrom.except(resultToFrom).isEmpty)
}
test("Handle TimestampType between to_protobuf and from_protobuf") {
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
index efc02524e68..840535654ed 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.protobuf
import com.google.protobuf.Descriptors.Descriptor
import com.google.protobuf.DynamicMessage
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.NoopFilters
+import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
-import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -35,7 +36,7 @@ class ProtobufSerdeSuite extends SharedSparkSession {
import ProtoSerdeSuite._
import ProtoSerdeSuite.MatchType._
- val testFileDesc = testFile("protobuf/serde_suite.desc").replace("file:/", "/")
+ val testFileDesc = testFile("serde_suite.desc").replace("file:/", "/")
private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SerdeSuiteProtos$"
test("Test basic conversion") {
@@ -65,22 +66,24 @@ class ProtobufSerdeSuite extends SharedSparkSession {
test("Fail to convert with field type mismatch") {
val protoFile = ProtobufUtils.buildDescriptor(testFileDesc, "MissMatchTypeInRoot")
-
withFieldMatchType { fieldMatch =>
assertFailedConversionMessage(
protoFile,
Deserializer,
fieldMatch,
- "Cannot convert Protobuf field 'foo' to SQL field 'foo' because schema is incompatible " +
- s"(protoType = org.apache.spark.sql.protobuf.MissMatchTypeInRoot.foo " +
- s"LABEL_OPTIONAL LONG INT64, sqlType = ${CATALYST_STRUCT.head.dataType.sql})".stripMargin)
+ errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE",
+ params = Map(
+ "protobufType" -> "MissMatchTypeInRoot",
+ "toType" -> toSQLType(CATALYST_STRUCT)))
assertFailedConversionMessage(
protoFile,
Serializer,
fieldMatch,
- s"Cannot convert SQL field 'foo' to Protobuf field 'foo' because schema is incompatible " +
- s"""(sqlType = ${CATALYST_STRUCT.head.dataType.sql}, protoType = LONG)""")
+ errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+ params = Map(
+ "protobufType" -> "MissMatchTypeInRoot",
+ "toType" -> toSQLType(CATALYST_STRUCT)))
}
}
@@ -91,9 +94,22 @@ class ProtobufSerdeSuite extends SharedSparkSession {
.add("foo", new StructType().add("bar", IntegerType, nullable = false))
// serialize fails whether or not 'bar' is nullable
- val byNameMsg = "Cannot find field 'foo.bar' in Protobuf schema"
- assertFailedConversionMessage(protoFile, Serializer, BY_NAME, byNameMsg)
- assertFailedConversionMessage(protoFile, Serializer, BY_NAME, byNameMsg, nonnullCatalyst)
+ assertFailedConversionMessage(
+ protoFile,
+ Serializer,
+ BY_NAME,
+ errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+ params = Map(
+ "protobufType" -> "FieldMissingInProto",
+ "toType" -> toSQLType(CATALYST_STRUCT)))
+
+ assertFailedConversionMessage(protoFile,
+ Serializer,
+ BY_NAME,
+ errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+ params = Map(
+ "protobufType" -> "FieldMissingInProto",
+ "toType" -> toSQLType(nonnullCatalyst)))
}
test("Fail to convert with deeply nested field type mismatch") {
@@ -107,18 +123,21 @@ class ProtobufSerdeSuite extends SharedSparkSession {
protoFile,
Deserializer,
fieldMatch,
- s"Cannot convert Protobuf field 'top.foo.bar' to SQL field 'top.foo.bar' because schema " +
- s"is incompatible (protoType = org.apache.spark.sql.protobuf.protos.TypeMiss.bar " +
- s"LABEL_OPTIONAL LONG INT64, sqlType = INT)",
- catalyst)
+ catalyst,
+ errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE",
+ params = Map(
+ "protobufType" -> "MissMatchTypeInDeepNested",
+ "toType" -> toSQLType(catalyst)))
assertFailedConversionMessage(
protoFile,
Serializer,
fieldMatch,
- "Cannot convert SQL field 'top.foo.bar' to Protobuf field 'top.foo.bar' because schema " +
- """is incompatible (sqlType = INT, protoType = LONG)""",
- catalyst)
+ catalyst,
+ errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+ params = Map(
+ "protobufType" -> "MissMatchTypeInDeepNested",
+ "toType" -> toSQLType(catalyst)))
}
}
@@ -130,7 +149,10 @@ class ProtobufSerdeSuite extends SharedSparkSession {
protoFile,
Serializer,
BY_NAME,
- "Found field 'boo' in Protobuf schema but there is no match in the SQL schema")
+ errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+ params = Map(
+ "protobufType" -> "FieldMissingInSQLRoot",
+ "toType" -> toSQLType(CATALYST_STRUCT)))
/* deserializing should work regardless of whether the extra field is missing
in SQL Schema or not */
@@ -144,7 +166,10 @@ class ProtobufSerdeSuite extends SharedSparkSession {
protoNestedFile,
Serializer,
BY_NAME,
- "Found field 'foo.baz' in Protobuf schema but there is no match in the SQL schema")
+ errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+ params = Map(
+ "protobufType" -> "FieldMissingInSQLNested",
+ "toType" -> toSQLType(CATALYST_STRUCT)))
/* deserializing should work regardless of whether the extra field is missing
in SQL Schema or not */
@@ -161,20 +186,28 @@ class ProtobufSerdeSuite extends SharedSparkSession {
protoSchema: Descriptor,
serdeFactory: SerdeFactory[_],
fieldMatchType: MatchType,
- expectedCauseMessage: String,
- catalystSchema: StructType = CATALYST_STRUCT): Unit = {
- val e = intercept[IncompatibleSchemaException] {
+ catalystSchema: StructType = CATALYST_STRUCT,
+ errorClass: String,
+ params: Map[String, String]): Unit = {
+
+ val e = intercept[AnalysisException] {
serdeFactory.create(catalystSchema, protoSchema, fieldMatchType)
}
+
val expectMsg = serdeFactory match {
case Deserializer =>
- s"Cannot convert Protobuf type ${protoSchema.getName} to SQL type ${catalystSchema.sql}."
+ s"[CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE] Unable to convert" +
+ s" ${protoSchema.getName} of Protobuf to SQL type ${toSQLType(catalystSchema)}."
case Serializer =>
- s"Cannot convert SQL type ${catalystSchema.sql} to Protobuf type ${protoSchema.getName}."
+ s"[UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE] Unable to convert SQL type" +
+ s" ${toSQLType(catalystSchema)} to Protobuf type ${protoSchema.getName}."
}
assert(e.getMessage === expectMsg)
- assert(e.getCause.getMessage === expectedCauseMessage)
+ checkError(
+ exception = e,
+ errorClass = errorClass,
+ parameters = params)
}
def withFieldMatchType(f: MatchType => Unit): Unit = {
diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 053c2a7af21..7fc806752be 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -17,6 +17,31 @@
],
"sqlState" : "22005"
},
+ "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR" : {
+ "message" : [
+ "Error constructing FileDescriptor for <descFilePath>"
+ ]
+ },
+ "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE" : {
+ "message" : [
+ "Cannot convert Protobuf <protobufColumn> to SQL <sqlColumn> because schema is incompatible (protobufType = <protobufType>, sqlType = <sqlType>)."
+ ]
+ },
+ "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE" : {
+ "message" : [
+ "Unable to convert <protobufType> of Protobuf to SQL type <toType>."
+ ]
+ },
+ "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_ENUM_TYPE" : {
+ "message" : [
+ "Cannot convert SQL <sqlColumn> to Protobuf <protobufColumn> because <data> cannot be written since it's not defined in ENUM <enumString>"
+ ]
+ },
+ "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_FIELD_TYPE" : {
+ "message" : [
+ "Cannot convert SQL <sqlColumn> to Protobuf <protobufColumn> because schema is incompatible (protobufType = <protobufType>, sqlType = <sqlType>)."
+ ]
+ },
"CANNOT_DECODE_URL" : {
"message" : [
"Cannot decode url : <url>."
@@ -29,12 +54,22 @@
],
"sqlState" : "22007"
},
+ "CANNOT_LOAD_PROTOBUF_CLASS" : {
+ "message" : [
+ "Could not load Protobuf class with name <protobufClassName>. Ensure the class name includes package prefix."
+ ]
+ },
"CANNOT_PARSE_DECIMAL" : {
"message" : [
"Cannot parse decimal"
],
"sqlState" : "42000"
},
+ "CANNOT_PARSE_PROTOBUF_DESCRIPTOR" : {
+ "message" : [
+ "Error parsing file <descFilePath> descriptor byte[] into Descriptor object"
+ ]
+ },
"CANNOT_PARSE_TIMESTAMP" : {
"message" : [
"<message>. If necessary set <ansiConfig> to \"false\" to bypass this error."
@@ -551,6 +586,11 @@
"Invalid bucket file: <path>"
]
},
+ "INVALID_BYTE_STRING" : {
+ "message" : [
+ "The expected format is ByteString, but was <unsupported> (<class>)."
+ ]
+ },
"INVALID_COLUMN_OR_FIELD_DATA_TYPE" : {
"message" : [
"Column or field <name> is of type <type> while it's required to be <expectedType>."
@@ -600,6 +640,11 @@
"<value> is an invalid property value, please use quotes, e.g. SET <key>=<value>"
]
},
+ "INVALID_PROTOBUF_MESSAGE_TYPE" : {
+ "message" : [
+ "<protobufClassName> is not a Protobuf message type"
+ ]
+ },
"INVALID_SQL_SYNTAX" : {
"message" : [
"Invalid SQL syntax: <inputString>"
@@ -618,6 +663,11 @@
}
}
},
+ "MALFORMED_PROTOBUF_MESSAGE" : {
+ "message" : [
+ "Malformed Protobuf messages are detected in message deserialization. Parse Mode: <failFastMode>. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'."
+ ]
+ },
"MISSING_STATIC_PARTITION_COLUMN" : {
"message" : [
"Unknown static partition column: <columnName>"
@@ -669,7 +719,12 @@
"No handler for UDAF '<functionName>'. Use sparkSession.udf.register(...) instead."
]
},
- "NO_UDF_INTERFACE_ERROR" : {
+ "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA" : {
+ "message" : [
+ "Cannot find <catalystFieldPath> in Protobuf schema"
+ ]
+ },
+ "NO_UDF_INTERFACE" : {
"message" : [
"UDF class <className> doesn't implement any UDF interface"
]
@@ -741,6 +796,46 @@
],
"sqlState" : "42000"
},
+ "PROTOBUF_DEPENDENCY_NOT_FOUND" : {
+ "message" : [
+ "Could not find dependency: <dependencyName>"
+ ]
+ },
+ "PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND" : {
+ "message" : [
+ "Error reading Protobuf descriptor file at path: <filePath>"
+ ]
+ },
+ "PROTOBUF_FIELD_MISSING" : {
+ "message" : [
+ "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>"
+ ]
+ },
+ "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA" : {
+ "message" : [
+ "Found <field> in Protobuf schema but there is no match in the SQL schema"
+ ]
+ },
+ "PROTOBUF_FIELD_TYPE_MISMATCH" : {
+ "message" : [
+ "Type mismatch encountered for field: <field>"
+ ]
+ },
+ "PROTOBUF_MESSAGE_NOT_FOUND" : {
+ "message" : [
+ "Unable to locate Message <messageName> in Descriptor"
+ ]
+ },
+ "PROTOBUF_TYPE_NOT_SUPPORT" : {
+ "message" : [
+ "Protobuf type not yet supported: <protobufType>."
+ ]
+ },
+ "RECURSIVE_PROTOBUF_SCHEMA" : {
+ "message" : [
+ "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
+ ]
+ },
"RENAME_SRC_PATH_NOT_FOUND" : {
"message" : [
"Failed to rename as <sourcePath> was not found"
@@ -827,6 +922,16 @@
"Unable to acquire <requestedBytes> bytes of memory, got <receivedBytes>"
]
},
+ "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE" : {
+ "message" : [
+ "Unable to convert SQL type <toType> to Protobuf type <protobufType>."
+ ]
+ },
+ "UNKNOWN_PROTOBUF_MESSAGE_TYPE" : {
+ "message" : [
+ "Attempting to treat <descriptorName> as a Message, but it was <containingType>"
+ ]
+ },
"UNPIVOT_REQUIRES_ATTRIBUTES" : {
"message" : [
"UNPIVOT requires all given <given> expressions to be columns when no <empty> expressions are given. These are not columns: [<expressions>]."
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f61753f7b1d..18667d1efea 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -723,7 +723,7 @@ object SparkProtobuf {
dependencyOverrides += "com.google.protobuf" % "protobuf-java" % protoVersion,
- (Test / PB.protoSources) += (Test / sourceDirectory).value / "resources",
+ (Test / PB.protoSources) += (Test / sourceDirectory).value / "resources" / "protobuf",
(Test / PB.targets) := Seq(
PB.gens.java -> target.value / "generated-test-sources"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 03d16856755..ea2f961509a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3008,7 +3008,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
def udfClassDoesNotImplementAnyUDFInterfaceError(className: String): Throwable = {
new AnalysisException(
- errorClass = "NO_UDF_INTERFACE_ERROR",
+ errorClass = "NO_UDF_INTERFACE",
messageParameters = Map("className" -> className))
}
@@ -3213,4 +3213,179 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
messageParameters = Map("expression" -> toSQLExpr(expression))
)
}
+
+ def cannotConvertProtobufTypeToSqlTypeError(
+ protobufColumn: String,
+ sqlColumn: Seq[String],
+ protobufType: String,
+ sqlType: DataType): Throwable = {
+ new AnalysisException(
+ errorClass = "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE",
+ messageParameters = Map(
+ "protobufColumn" -> protobufColumn,
+ "sqlColumn" -> toSQLId(sqlColumn),
+ "protobufType" -> protobufType,
+ "sqlType" -> toSQLType(sqlType)))
+ }
+
+ def cannotConvertCatalystTypeToProtobufTypeError(
+ sqlColumn: Seq[String],
+ protobufColumn: String,
+ sqlType: DataType,
+ protobufType: String): Throwable = {
+ new AnalysisException(
+ errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_FIELD_TYPE",
+ messageParameters = Map(
+ "sqlColumn" -> toSQLId(sqlColumn),
+ "protobufColumn" -> protobufColumn,
+ "sqlType" -> toSQLType(sqlType),
+ "protobufType" -> protobufType))
+ }
+
+ def cannotConvertCatalystTypeToProtobufEnumTypeError(
+ sqlColumn: Seq[String],
+ protobufColumn: String,
+ data: String,
+ enumString: String): Throwable = {
+ new AnalysisException(
+ errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_ENUM_TYPE",
+ messageParameters = Map(
+ "sqlColumn" -> toSQLId(sqlColumn),
+ "protobufColumn" -> protobufColumn,
+ "data" -> data,
+ "enumString" -> enumString))
+ }
+
+ def cannotConvertProtobufTypeToCatalystTypeError(
+ protobufType: String,
+ sqlType: DataType,
+ cause: Throwable): Throwable = {
+ new AnalysisException(
+ errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE",
+ messageParameters = Map(
+ "protobufType" -> protobufType,
+ "toType" -> toSQLType(sqlType)),
+ cause = Option(cause.getCause))
+ }
+
+ def cannotConvertSqlTypeToProtobufError(
+ protobufType: String,
+ sqlType: DataType,
+ cause: Throwable): Throwable = {
+ new AnalysisException(
+ errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE",
+ messageParameters = Map(
+ "protobufType" -> protobufType,
+ "toType" -> toSQLType(sqlType)),
+ cause = Option(cause.getCause))
+ }
+
+ def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+ new AnalysisException(
+ errorClass = "PROTOBUF_TYPE_NOT_SUPPORT",
+ messageParameters = Map("protobufType" -> protobufType))
+ }
+
+ def unknownProtobufMessageTypeError(
+ descriptorName: String,
+ containingType: String): Throwable = {
+ new AnalysisException(
+ errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+ messageParameters = Map(
+ "descriptorName" -> descriptorName,
+ "containingType" -> containingType))
+ }
+
+ def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = {
+ new AnalysisException(
+ errorClass = "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA",
+ messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+ }
+
+ def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+ new AnalysisException(
+ errorClass = "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA",
+ messageParameters = Map("field" -> field))
+ }
+
+ def protobufFieldMatchError(field: String,
+ protobufSchema: String,
+ matchSize: String,
+ matches: String): Throwable = {
+ new AnalysisException(
+ errorClass = "PROTOBUF_FIELD_MISSING",
+ messageParameters = Map(
+ "field" -> field,
+ "protobufSchema" -> protobufSchema,
+ "matchSize" -> matchSize,
+ "matches" -> matches))
+ }
+
+ def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+ new AnalysisException(
+ errorClass = "PROTOBUF_MESSAGE_NOT_FOUND",
+ messageParameters = Map("messageName" -> messageName))
+ }
+
+ def descrioptorParseError(descFilePath: String, cause: Throwable): Throwable = {
+ new AnalysisException(
+ errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR",
+ messageParameters = Map.empty("descFilePath" -> descFilePath),
+ cause = Option(cause.getCause))
+ }
+
+ def cannotFindDescriptorFileError(filePath: String, cause: Throwable): Throwable = {
+ new AnalysisException(
+ errorClass = "PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND",
+ messageParameters = Map("filePath" -> filePath),
+ cause = Option(cause.getCause))
+ }
+
+ def failedParsingDescriptorError(descFilePath: String, cause: Throwable): Throwable = {
+ new AnalysisException(
+ errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
+ messageParameters = Map.empty("descFilePath" -> descFilePath),
+ cause = Option(cause.getCause))
+ }
+
+ def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = {
+ new AnalysisException(
+ errorClass = "RECURSIVE_PROTOBUF_SCHEMA",
+ messageParameters = Map("fieldDescriptor" -> fieldDescriptor))
+ }
+
+ def protobufFieldTypeMismatchError(field: String): Throwable = {
+ new AnalysisException(
+ errorClass = "PROTOBUF_FIELD_TYPE_MISMATCH",
+ messageParameters = Map("field" -> field))
+ }
+
+ def protobufClassLoadError(
+ protobufClassName: String,
+ cause: Throwable): Throwable = {
+ new AnalysisException(
+ errorClass = "CANNOT_LOAD_PROTOBUF_CLASS",
+ messageParameters = Map("protobufClassName" -> protobufClassName),
+ cause = Option(cause.getCause))
+ }
+
+ def protobufMessageTypeError(protobufClassName: String): Throwable = {
+ new AnalysisException(
+ errorClass = "INVALID_PROTOBUF_MESSAGE_TYPE",
+ messageParameters = Map("protobufClassName" -> protobufClassName))
+ }
+
+ def protobufDescriptorDependencyError(dependencyName: String): Throwable = {
+ new AnalysisException(
+ errorClass = "PROTOBUF_DEPENDENCY_NOT_FOUND",
+ messageParameters = Map("dependencyName" -> dependencyName))
+ }
+
+ def invalidByteStringFormatError(unsupported: Any): Throwable = {
+ new AnalysisException(
+ errorClass = "INVALID_BYTE_STRING",
+ messageParameters = Map(
+ "unsupported" -> unsupported.toString,
+ "class" -> unsupported.getClass.toString))
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index f78ff23d269..b8febfffba1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2808,4 +2808,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
errorClass = "UNSUPPORTED_EMPTY_LOCATION",
messageParameters = Map.empty)
}
+
+ def malformedProtobufMessageDetectedInMessageParsingError(e: Throwable): Throwable = {
+ new SparkException(
+ errorClass = "MALFORMED_PROTOBUF_MESSAGE",
+ messageParameters = Map(
+ "failFastMode" -> FailFastMode.name),
+ cause = e)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
index 8086a0e97ec..bed647ef49f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
@@ -227,7 +227,7 @@ class QueryCompilationErrorsSuite
parameters = Map[String, String]())
}
- test("NO_UDF_INTERFACE_ERROR: java udf class does not implement any udf interface") {
+ test("NO_UDF_INTERFACE: java udf class does not implement any udf interface") {
val className = "org.apache.spark.sql.errors.MyCastToString"
val e = intercept[AnalysisException](
spark.udf.registerJava(
@@ -237,7 +237,7 @@ class QueryCompilationErrorsSuite
)
checkError(
exception = e,
- errorClass = "NO_UDF_INTERFACE_ERROR",
+ errorClass = "NO_UDF_INTERFACE",
parameters = Map("className" -> className))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org