You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/03/29 07:20:28 UTC
[spark] branch master updated: [SPARK-42956][CONNECT] Support avro functions for Scala client
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e586e212ca2 [SPARK-42956][CONNECT] Support avro functions for Scala client
e586e212ca2 is described below
commit e586e212ca2eb1d5f30443b6ee97bae8f6498cdb
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Wed Mar 29 16:20:08 2023 +0900
[SPARK-42956][CONNECT] Support avro functions for Scala client
### What changes were proposed in this pull request?
This pr aims to support avro functions for Scala client.
### Why are the changes needed?
Add Spark connect jvm client api coverage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Add new test
- Checked Scala 2.13
Closes #40584 from LuciferYang/SPARK-42956.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../org/apache/spark/sql/avro/functions.scala | 97 +++++++++++++++++++++
.../scala/org/apache/spark/sql/functions.scala | 2 +-
.../org/apache/spark/sql/FunctionTestSuite.scala | 8 ++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 23 +++++
.../explain-results/from_avro_with_options.explain | 2 +
.../from_avro_without_options.explain | 2 +
.../explain-results/to_avro_with_schema.explain | 2 +
.../explain-results/to_avro_without_schema.explain | 2 +
.../queries/from_avro_with_options.json | 50 +++++++++++
.../queries/from_avro_with_options.proto.bin | Bin 0 -> 173 bytes
.../queries/from_avro_without_options.json | 29 ++++++
.../queries/from_avro_without_options.proto.bin | Bin 0 -> 112 bytes
.../query-tests/queries/to_avro_with_schema.json | 29 ++++++
.../queries/to_avro_with_schema.proto.bin | Bin 0 -> 103 bytes
.../queries/to_avro_without_schema.json | 25 ++++++
.../queries/to_avro_without_schema.proto.bin | Bin 0 -> 69 bytes
16 files changed, 270 insertions(+), 1 deletion(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/avro/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/avro/functions.scala
new file mode 100644
index 00000000000..c4b16ca0d5e
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/avro/functions.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.functions.{fnWithOptions, lit}
+
+// scalastyle:off: object.name
+object functions {
+// scalastyle:on: object.name
+
+ /**
+ * Converts a binary column of avro format into its corresponding catalyst value. The specified
+ * schema must match the read data, otherwise the behavior is undefined: it may fail or return
+ * arbitrary result.
+ *
+ * @param data
+ * the binary column.
+ * @param jsonFormatSchema
+ * the avro schema in JSON string format.
+ *
+ * @since 3.5.0
+ */
+ @Experimental
+ def from_avro(data: Column, jsonFormatSchema: String): Column = {
+ Column.fn("from_avro", data, lit(jsonFormatSchema))
+ }
+
+ /**
+ * Converts a binary column of Avro format into its corresponding catalyst value. The specified
+ * schema must match actual schema of the read data, otherwise the behavior is undefined: it may
+ * fail or return arbitrary result. To deserialize the data with a compatible and evolved
+ * schema, the expected Avro schema can be set via the option avroSchema.
+ *
+ * @param data
+ * the binary column.
+ * @param jsonFormatSchema
+ * the avro schema in JSON string format.
+ * @param options
+ * options to control how the Avro record is parsed.
+ *
+ * @since 3.5.0
+ */
+ @Experimental
+ def from_avro(
+ data: Column,
+ jsonFormatSchema: String,
+ options: java.util.Map[String, String]): Column = {
+ fnWithOptions("from_avro", options.asScala.iterator, data, lit(jsonFormatSchema))
+ }
+
+ /**
+ * Converts a column into binary of avro format.
+ *
+ * @param data
+ * the data column.
+ *
+ * @since 3.5.0
+ */
+ @Experimental
+ def to_avro(data: Column): Column = {
+ Column.fn("to_avro", data)
+ }
+
+ /**
+ * Converts a column into binary of avro format.
+ *
+ * @param data
+ * the data column.
+ * @param jsonFormatSchema
+ * user-specified output avro schema in JSON string format.
+ *
+ * @since 3.5.0
+ */
+ @Experimental
+ def to_avro(data: Column, jsonFormatSchema: String): Column = {
+ Column.fn("to_avro", data, lit(jsonFormatSchema))
+ }
+}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 729ee9ed6a0..c7bf3964924 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -4647,7 +4647,7 @@ object functions {
* Invoke a function with an options map as its last argument. If there are no options, its
* column is dropped.
*/
- private def fnWithOptions(
+ private[sql] def fnWithOptions(
name: String,
options: Iterator[(String, String)],
arguments: Column*): Column = {
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
index 9e02eb13078..9fdf0c32522 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql
import java.util.Collections
+import org.apache.spark.sql.avro.{functions => avroFn}
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StructType}
@@ -225,6 +226,13 @@ class FunctionTestSuite extends ConnectFunSuite {
schema_of_csv("x,y"),
schema_of_csv(lit("x,y"), Collections.emptyMap()))
testEquals("to_csv", to_csv(a), to_csv(a, Collections.emptyMap[String, String]))
+ testEquals(
+ "from_avro",
+ avroFn.from_avro(a, """{"type": "int", "name": "id"}"""),
+ avroFn.from_avro(
+ a,
+ """{"type": "int", "name": "id"}""",
+ Collections.emptyMap[String, String]))
test("assert_true no message") {
val e = assert_true(a).expr
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index f11576dc439..3b1537ce755 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -20,6 +20,7 @@ import java.nio.file.{Files, Path}
import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
@@ -31,6 +32,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{functions => fn}
+import org.apache.spark.sql.avro.{functions => avroFn}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.connect.client.SparkConnectClient
@@ -2167,4 +2169,25 @@ class PlanGenerationTestSuite
test("table API with options") {
session.read.options(Map("p1" -> "v1", "p2" -> "v2")).table("tempdb.myTable")
}
+
+ /* Avro functions */
+ test("from_avro with options") {
+ binary.select(
+ avroFn.from_avro(
+ fn.col("bytes"),
+ """{"type": "int", "name": "id"}""",
+ Map("mode" -> "FAILFAST", "compression" -> "zstandard").asJava))
+ }
+
+ test("from_avro without options") {
+ binary.select(avroFn.from_avro(fn.col("bytes"), """{"type": "string", "name": "name"}"""))
+ }
+
+ test("to_avro with schema") {
+ simple.select(avroFn.to_avro(fn.col("a"), """{"type": "int", "name": "id"}"""))
+ }
+
+ test("to_avro without schema") {
+ simple.select(avroFn.to_avro(fn.col("id")))
+ }
}
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_with_options.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_with_options.explain
new file mode 100644
index 00000000000..1ef91ef8c36
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_with_options.explain
@@ -0,0 +1,2 @@
+Project [from_avro(bytes#0, {"type": "int", "name": "id"}, (mode,FAILFAST), (compression,zstandard)) AS from_avro(bytes)#0]
++- LocalRelation <empty>, [id#0L, bytes#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_without_options.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_without_options.explain
new file mode 100644
index 00000000000..8fca0b53416
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_without_options.explain
@@ -0,0 +1,2 @@
+Project [from_avro(bytes#0, {"type": "string", "name": "name"}) AS from_avro(bytes)#0]
++- LocalRelation <empty>, [id#0L, bytes#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_with_schema.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_with_schema.explain
new file mode 100644
index 00000000000..cd2dc984e3f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_with_schema.explain
@@ -0,0 +1,2 @@
+Project [to_avro(a#0, Some({"type": "int", "name": "id"})) AS to_avro(a)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_without_schema.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_without_schema.explain
new file mode 100644
index 00000000000..a5371c70ac7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_without_schema.explain
@@ -0,0 +1,2 @@
+Project [to_avro(id#0L, None) AS to_avro(id)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.json b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.json
new file mode 100644
index 00000000000..662aa746af2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.json
@@ -0,0 +1,50 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,bytes:binary\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "from_avro",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "bytes"
+ }
+ }, {
+ "literal": {
+ "string": "{\"type\": \"int\", \"name\": \"id\"}"
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "map",
+ "arguments": [{
+ "literal": {
+ "string": "mode"
+ }
+ }, {
+ "literal": {
+ "string": "FAILFAST"
+ }
+ }, {
+ "literal": {
+ "string": "compression"
+ }
+ }, {
+ "literal": {
+ "string": "zstandard"
+ }
+ }]
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.proto.bin
new file mode 100644
index 00000000000..5da5c48b411
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.json b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.json
new file mode 100644
index 00000000000..da2840f2d3a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.json
@@ -0,0 +1,29 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,bytes:binary\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "from_avro",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "bytes"
+ }
+ }, {
+ "literal": {
+ "string": "{\"type\": \"string\", \"name\": \"name\"}"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.proto.bin
new file mode 100644
index 00000000000..4dd12e2dbe1
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.json b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.json
new file mode 100644
index 00000000000..6079e13bbfc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.json
@@ -0,0 +1,29 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "to_avro",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "literal": {
+ "string": "{\"type\": \"int\", \"name\": \"id\"}"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.proto.bin
new file mode 100644
index 00000000000..2843fbb67fe
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.json b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.json
new file mode 100644
index 00000000000..fa19d2120b9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.json
@@ -0,0 +1,25 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "to_avro",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.proto.bin
new file mode 100644
index 00000000000..4e7251125e4
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.proto.bin differ
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org