You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/03/29 07:20:28 UTC

[spark] branch master updated: [SPARK-42956][CONNECT] Support avro functions for Scala client

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e586e212ca2 [SPARK-42956][CONNECT] Support avro functions for Scala client
e586e212ca2 is described below

commit e586e212ca2eb1d5f30443b6ee97bae8f6498cdb
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Wed Mar 29 16:20:08 2023 +0900

    [SPARK-42956][CONNECT] Support avro functions for Scala client
    
    ### What changes were proposed in this pull request?
    This pr aims to support avro functions for Scala client.
    
    ### Why are the changes needed?
    Add Spark connect jvm client api coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    - Add new test
    - Checked Scala 2.13
    
    Closes #40584 from LuciferYang/SPARK-42956.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../org/apache/spark/sql/avro/functions.scala      |  97 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/functions.scala     |   2 +-
 .../org/apache/spark/sql/FunctionTestSuite.scala   |   8 ++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  23 +++++
 .../explain-results/from_avro_with_options.explain |   2 +
 .../from_avro_without_options.explain              |   2 +
 .../explain-results/to_avro_with_schema.explain    |   2 +
 .../explain-results/to_avro_without_schema.explain |   2 +
 .../queries/from_avro_with_options.json            |  50 +++++++++++
 .../queries/from_avro_with_options.proto.bin       | Bin 0 -> 173 bytes
 .../queries/from_avro_without_options.json         |  29 ++++++
 .../queries/from_avro_without_options.proto.bin    | Bin 0 -> 112 bytes
 .../query-tests/queries/to_avro_with_schema.json   |  29 ++++++
 .../queries/to_avro_with_schema.proto.bin          | Bin 0 -> 103 bytes
 .../queries/to_avro_without_schema.json            |  25 ++++++
 .../queries/to_avro_without_schema.proto.bin       | Bin 0 -> 69 bytes
 16 files changed, 270 insertions(+), 1 deletion(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/avro/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/avro/functions.scala
new file mode 100644
index 00000000000..c4b16ca0d5e
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/avro/functions.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.functions.{fnWithOptions, lit}
+
+// scalastyle:off: object.name
+object functions {
+// scalastyle:on: object.name
+
+  /**
+   * Converts a binary column of avro format into its corresponding catalyst value. The specified
+   * schema must match the read data, otherwise the behavior is undefined: it may fail or return
+   * arbitrary result.
+   *
+   * @param data
+   *   the binary column.
+   * @param jsonFormatSchema
+   *   the avro schema in JSON string format.
+   *
+   * @since 3.5.0
+   */
+  @Experimental
+  def from_avro(data: Column, jsonFormatSchema: String): Column = {
+    Column.fn("from_avro", data, lit(jsonFormatSchema))
+  }
+
+  /**
+   * Converts a binary column of Avro format into its corresponding catalyst value. The specified
+   * schema must match actual schema of the read data, otherwise the behavior is undefined: it may
+   * fail or return arbitrary result. To deserialize the data with a compatible and evolved
+   * schema, the expected Avro schema can be set via the option avroSchema.
+   *
+   * @param data
+   *   the binary column.
+   * @param jsonFormatSchema
+   *   the avro schema in JSON string format.
+   * @param options
+   *   options to control how the Avro record is parsed.
+   *
+   * @since 3.5.0
+   */
+  @Experimental
+  def from_avro(
+      data: Column,
+      jsonFormatSchema: String,
+      options: java.util.Map[String, String]): Column = {
+    fnWithOptions("from_avro", options.asScala.iterator, data, lit(jsonFormatSchema))
+  }
+
+  /**
+   * Converts a column into binary of avro format.
+   *
+   * @param data
+   *   the data column.
+   *
+   * @since 3.5.0
+   */
+  @Experimental
+  def to_avro(data: Column): Column = {
+    Column.fn("to_avro", data)
+  }
+
+  /**
+   * Converts a column into binary of avro format.
+   *
+   * @param data
+   *   the data column.
+   * @param jsonFormatSchema
+   *   user-specified output avro schema in JSON string format.
+   *
+   * @since 3.5.0
+   */
+  @Experimental
+  def to_avro(data: Column, jsonFormatSchema: String): Column = {
+    Column.fn("to_avro", data, lit(jsonFormatSchema))
+  }
+}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 729ee9ed6a0..c7bf3964924 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -4647,7 +4647,7 @@ object functions {
    * Invoke a function with an options map as its last argument. If there are no options, its
    * column is dropped.
    */
-  private def fnWithOptions(
+  private[sql] def fnWithOptions(
       name: String,
       options: Iterator[(String, String)],
       arguments: Column*): Column = {
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
index 9e02eb13078..9fdf0c32522 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql
 
 import java.util.Collections
 
+import org.apache.spark.sql.avro.{functions => avroFn}
 import org.apache.spark.sql.connect.client.util.ConnectFunSuite
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DataType, StructType}
@@ -225,6 +226,13 @@ class FunctionTestSuite extends ConnectFunSuite {
     schema_of_csv("x,y"),
     schema_of_csv(lit("x,y"), Collections.emptyMap()))
   testEquals("to_csv", to_csv(a), to_csv(a, Collections.emptyMap[String, String]))
+  testEquals(
+    "from_avro",
+    avroFn.from_avro(a, """{"type": "int", "name": "id"}"""),
+    avroFn.from_avro(
+      a,
+      """{"type": "int", "name": "id"}""",
+      Collections.emptyMap[String, String]))
 
   test("assert_true no message") {
     val e = assert_true(a).expr
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index f11576dc439..3b1537ce755 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -20,6 +20,7 @@ import java.nio.file.{Files, Path}
 import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicLong
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.{Failure, Success, Try}
 
@@ -31,6 +32,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{functions => fn}
+import org.apache.spark.sql.avro.{functions => avroFn}
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
 import org.apache.spark.sql.connect.client.SparkConnectClient
@@ -2167,4 +2169,25 @@ class PlanGenerationTestSuite
   test("table API with options") {
     session.read.options(Map("p1" -> "v1", "p2" -> "v2")).table("tempdb.myTable")
   }
+
+  /* Avro functions */
+  test("from_avro with options") {
+    binary.select(
+      avroFn.from_avro(
+        fn.col("bytes"),
+        """{"type": "int", "name": "id"}""",
+        Map("mode" -> "FAILFAST", "compression" -> "zstandard").asJava))
+  }
+
+  test("from_avro without options") {
+    binary.select(avroFn.from_avro(fn.col("bytes"), """{"type": "string", "name": "name"}"""))
+  }
+
+  test("to_avro with schema") {
+    simple.select(avroFn.to_avro(fn.col("a"), """{"type": "int", "name": "id"}"""))
+  }
+
+  test("to_avro without schema") {
+    simple.select(avroFn.to_avro(fn.col("id")))
+  }
 }
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_with_options.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_with_options.explain
new file mode 100644
index 00000000000..1ef91ef8c36
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_with_options.explain
@@ -0,0 +1,2 @@
+Project [from_avro(bytes#0, {"type": "int", "name": "id"}, (mode,FAILFAST), (compression,zstandard)) AS from_avro(bytes)#0]
++- LocalRelation <empty>, [id#0L, bytes#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_without_options.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_without_options.explain
new file mode 100644
index 00000000000..8fca0b53416
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/from_avro_without_options.explain
@@ -0,0 +1,2 @@
+Project [from_avro(bytes#0, {"type": "string", "name": "name"}) AS from_avro(bytes)#0]
++- LocalRelation <empty>, [id#0L, bytes#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_with_schema.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_with_schema.explain
new file mode 100644
index 00000000000..cd2dc984e3f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_with_schema.explain
@@ -0,0 +1,2 @@
+Project [to_avro(a#0, Some({"type": "int", "name": "id"})) AS to_avro(a)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_without_schema.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_without_schema.explain
new file mode 100644
index 00000000000..a5371c70ac7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/to_avro_without_schema.explain
@@ -0,0 +1,2 @@
+Project [to_avro(id#0L, None) AS to_avro(id)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.json b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.json
new file mode 100644
index 00000000000..662aa746af2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.json
@@ -0,0 +1,50 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,bytes:binary\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "from_avro",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "bytes"
+          }
+        }, {
+          "literal": {
+            "string": "{\"type\": \"int\", \"name\": \"id\"}"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "map",
+            "arguments": [{
+              "literal": {
+                "string": "mode"
+              }
+            }, {
+              "literal": {
+                "string": "FAILFAST"
+              }
+            }, {
+              "literal": {
+                "string": "compression"
+              }
+            }, {
+              "literal": {
+                "string": "zstandard"
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.proto.bin
new file mode 100644
index 00000000000..5da5c48b411
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_with_options.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.json b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.json
new file mode 100644
index 00000000000..da2840f2d3a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.json
@@ -0,0 +1,29 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,bytes:binary\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "from_avro",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "bytes"
+          }
+        }, {
+          "literal": {
+            "string": "{\"type\": \"string\", \"name\": \"name\"}"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.proto.bin
new file mode 100644
index 00000000000..4dd12e2dbe1
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/from_avro_without_options.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.json b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.json
new file mode 100644
index 00000000000..6079e13bbfc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.json
@@ -0,0 +1,29 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "to_avro",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "literal": {
+            "string": "{\"type\": \"int\", \"name\": \"id\"}"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.proto.bin
new file mode 100644
index 00000000000..2843fbb67fe
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_with_schema.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.json b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.json
new file mode 100644
index 00000000000..fa19d2120b9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "to_avro",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.proto.bin
new file mode 100644
index 00000000000..4e7251125e4
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/to_avro_without_schema.proto.bin differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org