You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/02/15 02:24:55 UTC

[spark] branch master updated: [SPARK-26870][SQL] Move to_avro/from_avro into functions object due to Java compatibility

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c406472  [SPARK-26870][SQL] Move to_avro/from_avro into functions object due to Java compatibility
c406472 is described below

commit c40647297031e0cd3e2682990c2067b59b2aeaa1
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Fri Feb 15 10:24:35 2019 +0800

    [SPARK-26870][SQL] Move to_avro/from_avro into functions object due to Java compatibility
    
    ## What changes were proposed in this pull request?
    
    Currently, looks, to use `from_avro` and `to_avro` in Java APIs side,
    
    ```java
    import static org.apache.spark.sql.avro.package$.MODULE$;
    
    MODULE$.to_avro
    MODULE$.from_avro
    ```
    
    This PR targets to deprecate and move both functions under `avro` package into `functions` object like the way of our `org.apache.spark.sql.functions`.
    
    Therefore, Java side can import:
    
    ```java
    import static org.apache.spark.sql.avro.functions.*;
    ```
    
    and Scala side can import:
    
    ```scala
    import org.apache.spark.sql.avro.functions._
    ```
    
    ## How was this patch tested?
    
    Manually tested, and unit tests for Java APIs were added.
    
    Closes #23784 from HyukjinKwon/SPARK-26870.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 docs/sql-data-sources-avro.md                      |  5 +-
 .../sql/avro/{package.scala => functions.scala}    | 12 ++--
 .../scala/org/apache/spark/sql/avro/package.scala  | 32 ++--------
 .../spark/sql/avro/JavaAvroFunctionsSuite.java     | 74 ++++++++++++++++++++++
 .../apache/spark/sql/avro/AvroFunctionsSuite.scala |  7 +-
 5 files changed, 95 insertions(+), 35 deletions(-)

diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index b403a66..afb91ae 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -78,7 +78,7 @@ Both functions are currently only available in Scala and Java.
 <div class="codetabs">
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-import org.apache.spark.sql.avro._
+import org.apache.spark.sql.avro.functions._
 
 // `from_avro` requires Avro schema in JSON string format.
 val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
@@ -109,7 +109,8 @@ val query = output
 </div>
 <div data-lang="java" markdown="1">
 {% highlight java %}
-import org.apache.spark.sql.avro.*;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.avro.functions.*;
 
 // `from_avro` requires Avro schema in JSON string format.
 String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala
similarity index 92%
copy from external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
copy to external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala
index dee8575..5ed7828 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala
@@ -15,13 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql
+package org.apache.spark.sql.avro
 
 import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.Column
 
-package object avro {
+
+// scalastyle:off: object.name
+object functions {
+// scalastyle:on: object.name
 
   /**
    * Converts a binary column of avro format into its corresponding catalyst value. The specified
@@ -31,7 +35,7 @@ package object avro {
    * @param data the binary column.
    * @param jsonFormatSchema the avro schema in JSON string format.
    *
-   * @since 2.4.0
+   * @since 3.0.0
    */
   @Experimental
   def from_avro(
@@ -64,7 +68,7 @@ package object avro {
    *
    * @param data the data column.
    *
-   * @since 2.4.0
+   * @since 3.0.0
    */
   @Experimental
   def to_avro(data: Column): Column = {
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
index dee8575..af0752e 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql
 
-import scala.collection.JavaConverters._
-
 import org.apache.spark.annotation.Experimental
 
 package object avro {
@@ -34,30 +32,11 @@ package object avro {
    * @since 2.4.0
    */
   @Experimental
+  @deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0")
   def from_avro(
       data: Column,
-      jsonFormatSchema: String): Column = {
-    new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, Map.empty))
-  }
-
-  /**
-   * Converts a binary column of avro format into its corresponding catalyst value. The specified
-   * schema must match the read data, otherwise the behavior is undefined: it may fail or return
-   * arbitrary result.
-   *
-   * @param data the binary column.
-   * @param jsonFormatSchema the avro schema in JSON string format.
-   * @param options options to control how the Avro record is parsed.
-   *
-   * @since 3.0.0
-   */
-  @Experimental
-  def from_avro(
-      data: Column,
-      jsonFormatSchema: String,
-      options: java.util.Map[String, String]): Column = {
-    new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, options.asScala.toMap))
-  }
+      jsonFormatSchema: String): Column =
+    org.apache.spark.sql.avro.functions.from_avro(data, jsonFormatSchema)
 
   /**
    * Converts a column into binary of avro format.
@@ -67,7 +46,6 @@ package object avro {
    * @since 2.4.0
    */
   @Experimental
-  def to_avro(data: Column): Column = {
-    new Column(CatalystDataToAvro(data.expr))
-  }
+  @deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0")
+  def to_avro(data: Column): Column = org.apache.spark.sql.avro.functions.to_avro(data)
 }
diff --git a/external/avro/src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java b/external/avro/src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java
new file mode 100644
index 0000000..a448583
--- /dev/null
+++ b/external/avro/src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.QueryTest$;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.test.TestSparkSession;
+
+import static org.apache.spark.sql.avro.functions.to_avro;
+import static org.apache.spark.sql.avro.functions.from_avro;
+
+
+public class JavaAvroFunctionsSuite {
+  private transient TestSparkSession spark;
+
+  @Before
+  public void setUp() {
+    spark = new TestSparkSession();
+  }
+
+  @After
+  public void tearDown() {
+    spark.stop();
+  }
+
+  private static void checkAnswer(Dataset<Row> actual, Dataset<Row> expected) {
+    String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected.collectAsList());
+    if (errorMessage != null) {
+      Assert.fail(errorMessage);
+    }
+  }
+
+  @Test
+  public void testToAvroFromAvro() {
+    Dataset<Long> rangeDf = spark.range(10);
+    Dataset<Row> df = rangeDf.select(
+      rangeDf.col("id"), rangeDf.col("id").cast("string").as("str"));
+
+    Dataset<Row> avroDF =
+      df.select(
+        to_avro(df.col("id")).as("a"),
+        to_avro(df.col("str")).as("b"));
+
+    String avroTypeLong = "{\"type\": \"int\", \"name\": \"id\"}";
+    String avroTypeStr = "{\"type\": \"string\", \"name\": \"str\"}";
+
+    Dataset<Row> actual = avroDF.select(
+      from_avro(avroDF.col("a"), avroTypeLong),
+      from_avro(avroDF.col("b"), avroTypeStr));
+
+    checkAnswer(actual, df);
+  }
+}
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index 46a37d8..148a66c 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -79,13 +79,16 @@ class AvroFunctionsSuite extends QueryTest with SharedSQLContext with SQLTestUti
 
     intercept[SparkException] {
       avroStructDF.select(
-        from_avro('avro, avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect()
+        org.apache.spark.sql.avro.functions.from_avro(
+          'avro, avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect()
     }
 
     // For PERMISSIVE mode, the result should be row of null columns.
     val expected = (0 until count).map(_ => Row(Row(null, null)))
     checkAnswer(
-      avroStructDF.select(from_avro('avro, avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)),
+      avroStructDF.select(
+        org.apache.spark.sql.avro.functions.from_avro(
+          'avro, avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)),
       expected)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org