You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/02/15 02:24:55 UTC
[spark] branch master updated: [SPARK-26870][SQL] Move
to_avro/from_avro into functions object due to Java compatibility
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c406472 [SPARK-26870][SQL] Move to_avro/from_avro into functions object due to Java compatibility
c406472 is described below
commit c40647297031e0cd3e2682990c2067b59b2aeaa1
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Fri Feb 15 10:24:35 2019 +0800
[SPARK-26870][SQL] Move to_avro/from_avro into functions object due to Java compatibility
## What changes were proposed in this pull request?
Currently, looks, to use `from_avro` and `to_avro` in Java APIs side,
```java
import static org.apache.spark.sql.avro.package$.MODULE$;
MODULE$.to_avro
MODULE$.from_avro
```
This PR targets to deprecate and move both functions under `avro` package into `functions` object like the way of our `org.apache.spark.sql.functions`.
Therefore, Java side can import:
```java
import static org.apache.spark.sql.avro.functions.*;
```
and Scala side can import:
```scala
import org.apache.spark.sql.avro.functions._
```
## How was this patch tested?
Manually tested, and unit tests for Java APIs were added.
Closes #23784 from HyukjinKwon/SPARK-26870.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
docs/sql-data-sources-avro.md | 5 +-
.../sql/avro/{package.scala => functions.scala} | 12 ++--
.../scala/org/apache/spark/sql/avro/package.scala | 32 ++--------
.../spark/sql/avro/JavaAvroFunctionsSuite.java | 74 ++++++++++++++++++++++
.../apache/spark/sql/avro/AvroFunctionsSuite.scala | 7 +-
5 files changed, 95 insertions(+), 35 deletions(-)
diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index b403a66..afb91ae 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -78,7 +78,7 @@ Both functions are currently only available in Scala and Java.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
-import org.apache.spark.sql.avro._
+import org.apache.spark.sql.avro.functions._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
@@ -109,7 +109,8 @@ val query = output
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-import org.apache.spark.sql.avro.*;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.avro.functions.*;
// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala
similarity index 92%
copy from external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
copy to external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala
index dee8575..5ed7828 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala
@@ -15,13 +15,17 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.avro
import scala.collection.JavaConverters._
import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.Column
-package object avro {
+
+// scalastyle:off: object.name
+object functions {
+// scalastyle:on: object.name
/**
* Converts a binary column of avro format into its corresponding catalyst value. The specified
@@ -31,7 +35,7 @@ package object avro {
* @param data the binary column.
* @param jsonFormatSchema the avro schema in JSON string format.
*
- * @since 2.4.0
+ * @since 3.0.0
*/
@Experimental
def from_avro(
@@ -64,7 +68,7 @@ package object avro {
*
* @param data the data column.
*
- * @since 2.4.0
+ * @since 3.0.0
*/
@Experimental
def to_avro(data: Column): Column = {
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
index dee8575..af0752e 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql
-import scala.collection.JavaConverters._
-
import org.apache.spark.annotation.Experimental
package object avro {
@@ -34,30 +32,11 @@ package object avro {
* @since 2.4.0
*/
@Experimental
+ @deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0")
def from_avro(
data: Column,
- jsonFormatSchema: String): Column = {
- new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, Map.empty))
- }
-
- /**
- * Converts a binary column of avro format into its corresponding catalyst value. The specified
- * schema must match the read data, otherwise the behavior is undefined: it may fail or return
- * arbitrary result.
- *
- * @param data the binary column.
- * @param jsonFormatSchema the avro schema in JSON string format.
- * @param options options to control how the Avro record is parsed.
- *
- * @since 3.0.0
- */
- @Experimental
- def from_avro(
- data: Column,
- jsonFormatSchema: String,
- options: java.util.Map[String, String]): Column = {
- new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, options.asScala.toMap))
- }
+ jsonFormatSchema: String): Column =
+ org.apache.spark.sql.avro.functions.from_avro(data, jsonFormatSchema)
/**
* Converts a column into binary of avro format.
@@ -67,7 +46,6 @@ package object avro {
* @since 2.4.0
*/
@Experimental
- def to_avro(data: Column): Column = {
- new Column(CatalystDataToAvro(data.expr))
- }
+ @deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0")
+ def to_avro(data: Column): Column = org.apache.spark.sql.avro.functions.to_avro(data)
}
diff --git a/external/avro/src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java b/external/avro/src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java
new file mode 100644
index 0000000..a448583
--- /dev/null
+++ b/external/avro/src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.QueryTest$;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.test.TestSparkSession;
+
+import static org.apache.spark.sql.avro.functions.to_avro;
+import static org.apache.spark.sql.avro.functions.from_avro;
+
+
+public class JavaAvroFunctionsSuite {
+ private transient TestSparkSession spark;
+
+ @Before
+ public void setUp() {
+ spark = new TestSparkSession();
+ }
+
+ @After
+ public void tearDown() {
+ spark.stop();
+ }
+
+ private static void checkAnswer(Dataset<Row> actual, Dataset<Row> expected) {
+ String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected.collectAsList());
+ if (errorMessage != null) {
+ Assert.fail(errorMessage);
+ }
+ }
+
+ @Test
+ public void testToAvroFromAvro() {
+ Dataset<Long> rangeDf = spark.range(10);
+ Dataset<Row> df = rangeDf.select(
+ rangeDf.col("id"), rangeDf.col("id").cast("string").as("str"));
+
+ Dataset<Row> avroDF =
+ df.select(
+ to_avro(df.col("id")).as("a"),
+ to_avro(df.col("str")).as("b"));
+
+ String avroTypeLong = "{\"type\": \"int\", \"name\": \"id\"}";
+ String avroTypeStr = "{\"type\": \"string\", \"name\": \"str\"}";
+
+ Dataset<Row> actual = avroDF.select(
+ from_avro(avroDF.col("a"), avroTypeLong),
+ from_avro(avroDF.col("b"), avroTypeStr));
+
+ checkAnswer(actual, df);
+ }
+}
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index 46a37d8..148a66c 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -79,13 +79,16 @@ class AvroFunctionsSuite extends QueryTest with SharedSQLContext with SQLTestUti
intercept[SparkException] {
avroStructDF.select(
- from_avro('avro, avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect()
+ org.apache.spark.sql.avro.functions.from_avro(
+ 'avro, avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect()
}
// For PERMISSIVE mode, the result should be row of null columns.
val expected = (0 until count).map(_ => Row(Row(null, null)))
checkAnswer(
- avroStructDF.select(from_avro('avro, avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)),
+ avroStructDF.select(
+ org.apache.spark.sql.avro.functions.from_avro(
+ 'avro, avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)),
expected)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org