You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2022/11/08 10:19:09 UTC
[spark] branch master updated: [SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 75643f4e9b0 [SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator
75643f4e9b0 is described below
commit 75643f4e9b0622f8d6848a155e23e5f44c55559d
Author: SandishKumarHN <sa...@gmail.com>
AuthorDate: Tue Nov 8 13:18:54 2022 +0300
[SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator
### What changes were proposed in this pull request?
null check for data generator after type conversion.
### Why are the changes needed?
To fix a test failure.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I have tested all the random numbers manually, current unit tests.
Closes #38515 from SandishKumarHN/SPARK-41015-UTests.
Authored-by: SandishKumarHN <sa...@gmail.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../spark/sql/protobuf/utils/ProtobufUtils.scala | 2 +-
.../src/test/resources/protobuf/basicmessage.proto | 1 +
.../resources/protobuf/basicmessage_noimports.desc | 18 +++++++++++++++++
.../ProtobufCatalystDataConversionSuite.scala | 13 ++++++++----
.../sql/protobuf/ProtobufFunctionsSuite.scala | 14 +++++++++++++
.../spark/sql/protobuf/ProtobufSerdeSuite.scala | 23 ++++++++++++++++++++++
.../spark/sql/errors/QueryCompilationErrors.scala | 4 ++--
7 files changed, 68 insertions(+), 7 deletions(-)
diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
index 4bd59ddce6c..52870be5fbe 100644
--- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
+++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
@@ -208,7 +208,7 @@ private[sql] object ProtobufUtils extends Logging {
).toList
fileDescriptorList
} catch {
- case e: Descriptors.DescriptorValidationException =>
+ case e: Exception =>
throw QueryCompilationErrors.failedParsingDescriptorError(descFilePath, e)
}
}
diff --git a/connector/protobuf/src/test/resources/protobuf/basicmessage.proto b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto
index 4252f349cf0..8f4c1bb8eae 100644
--- a/connector/protobuf/src/test/resources/protobuf/basicmessage.proto
+++ b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto
@@ -17,6 +17,7 @@
// cd connector/protobuf/src/test/resources/protobuf
// protoc --java_out=./ basicmessage.proto
// protoc --include_imports --descriptor_set_out=basicmessage.desc --java_out=org/apache/spark/sql/protobuf/ basicmessage.proto
+// protoc --descriptor_set_out=basicmessage_noimports.desc --java_out=org/apache/spark/sql/protobuf/ basicmessage.proto
syntax = "proto3";
diff --git a/connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc b/connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc
new file mode 100644
index 00000000000..26ba6552cb0
--- /dev/null
+++ b/connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc
@@ -0,0 +1,18 @@
+
+�
+basicmessage.proto$org.apache.spark.sql.protobuf.protosnestedenum.proto"�
+BasicMessage
+id (Rid!
+string_value ( RstringValue
+int32_value (R
+int32Value
+int64_value (R
+int64Value!
+double_value (RdoubleValue
+float_value (R
+floatValue
+
+bool_value (R boolValue
+bytes_value (R
+bytesValueS
+rnested_enum (20.org.apache.spark.sql.protobuf.protos.NestedEnumRrnestedEnumBBBasicMessageProtobproto3
\ No newline at end of file
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index 271c5b0fec8..9f9b51006ca 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -123,16 +123,21 @@ class ProtobufCatalystDataConversionSuite
StringType -> ("StringMsg", ""))
testingTypes.foreach { dt =>
- val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1)
+ val seed = scala.util.Random.nextInt(RandomDataGenerator.MAX_STR_LEN)
test(s"single $dt with seed $seed") {
val (messageName, defaultValue) = catalystTypesToProtoMessages(dt.fields(0).dataType)
val rand = new scala.util.Random(seed)
val generator = RandomDataGenerator.forType(dt, rand = rand).get
- var data = generator()
- while (data.asInstanceOf[Row].get(0) == defaultValue) // Do not use default values, since
- data = generator() // from_protobuf() returns null in v3.
+ var data = generator().asInstanceOf[Row]
+ // Do not use default values, since from_protobuf() returns null in v3.
+ while (
+ data != null &&
+ (data.get(0) == defaultValue ||
+ (dt == BinaryType &&
+ data.get(0).asInstanceOf[Array[Byte]].isEmpty)))
+ data = generator().asInstanceOf[Row]
val converter = CatalystTypeConverters.createToCatalystConverter(dt)
val input = Literal.create(converter(data), dt)
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
index 199ef235f14..00ec56f90a6 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
@@ -677,4 +677,18 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
=== inputDf.select("durationMsg.duration").take(1).toSeq(0).get(0))
}
}
+
+ test("raise cannot construct protobuf descriptor error") {
+ val df = Seq(ByteString.empty().toByteArray).toDF("value")
+ val testFileDescriptor = testFile("basicmessage_noimports.desc").replace("file:/", "/")
+
+ val e = intercept[AnalysisException] {
+ df.select(functions.from_protobuf($"value", "BasicMessage", testFileDescriptor) as 'sample)
+ .where("sample.string_value == \"slam\"").show()
+ }
+ checkError(
+ exception = e,
+ errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
+ parameters = Map("descFilePath" -> testFileDescriptor))
+ }
}
diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
index 840535654ed..22b9d58bbd4 100644
--- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
+++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
@@ -177,6 +177,29 @@ class ProtobufSerdeSuite extends SharedSparkSession {
withFieldMatchType(Deserializer.create(CATALYST_STRUCT, protoNestedFile, _))
}
+ test("raise cannot parse and construct protobuf descriptor error") {
+ // passing serde_suite.proto instead serde_suite.desc
+ var testFileDesc = testFile("serde_suite.proto").replace("file:/", "/")
+ val e1 = intercept[AnalysisException] {
+ ProtobufUtils.buildDescriptor(testFileDesc, "FieldMissingInSQLRoot")
+ }
+
+ checkError(
+ exception = e1,
+ errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR",
+ parameters = Map("descFilePath" -> testFileDesc))
+
+ testFileDesc = testFile("basicmessage_noimports.desc").replace("file:/", "/")
+ val e2 = intercept[AnalysisException] {
+ ProtobufUtils.buildDescriptor(testFileDesc, "FieldMissingInSQLRoot")
+ }
+
+ checkError(
+ exception = e2,
+ errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
+ parameters = Map("descFilePath" -> testFileDesc))
+ }
+
/**
* Attempt to convert `catalystSchema` to `protoSchema` (or vice-versa if `deserialize` is
* true), assert that it fails, and assert that the _cause_ of the thrown exception has a
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index b56e1957f77..139ea236e49 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3325,7 +3325,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
def descrioptorParseError(descFilePath: String, cause: Throwable): Throwable = {
new AnalysisException(
errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR",
- messageParameters = Map.empty("descFilePath" -> descFilePath),
+ messageParameters = Map("descFilePath" -> descFilePath),
cause = Option(cause.getCause))
}
@@ -3339,7 +3339,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
def failedParsingDescriptorError(descFilePath: String, cause: Throwable): Throwable = {
new AnalysisException(
errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
- messageParameters = Map.empty("descFilePath" -> descFilePath),
+ messageParameters = Map("descFilePath" -> descFilePath),
cause = Option(cause.getCause))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org