You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/03/11 01:15:27 UTC

[spark] branch master updated: [SPARK-26856][PYSPARK] Python support for from_avro and to_avro APIs

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3729efb  [SPARK-26856][PYSPARK] Python support for from_avro and to_avro APIs
3729efb is described below

commit 3729efb4d0420700a396c79a83a1d5db25ac3bcb
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Mon Mar 11 10:15:07 2019 +0900

    [SPARK-26856][PYSPARK] Python support for from_avro and to_avro APIs
    
    ## What changes were proposed in this pull request?
    
    Avro is built-in but external data source module since Spark 2.4 but  `from_avro` and `to_avro` APIs not yet supported in pyspark.
    
    In this PR I've made them available from pyspark.
    
    ## How was this patch tested?
    
    Please see the python API examples what I've added.
    
    cd docs/
    SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 jekyll build
    Manual webpage check.
    
    Closes #23797 from gaborgsomogyi/SPARK-26856.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 dev/sparktestsupport/modules.py          |   3 +-
 docs/sql-data-sources-avro.md            |  31 +++++++
 python/docs/pyspark.sql.rst              |   6 ++
 python/pyspark/sql/avro/__init__.py      |  18 +++++
 python/pyspark/sql/avro/functions.py     | 134 +++++++++++++++++++++++++++++++
 python/pyspark/streaming/kinesis.py      |  30 ++-----
 python/pyspark/testing/streamingutils.py |  31 +------
 python/pyspark/testing/utils.py          |  25 ++++++
 python/pyspark/util.py                   |  27 +++++++
 9 files changed, 253 insertions(+), 52 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index eef7f25..d496eec 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -341,7 +341,7 @@ pyspark_core = Module(
 
 pyspark_sql = Module(
     name="pyspark-sql",
-    dependencies=[pyspark_core, hive],
+    dependencies=[pyspark_core, hive, avro],
     source_file_regexes=[
         "python/pyspark/sql"
     ],
@@ -360,6 +360,7 @@ pyspark_sql = Module(
         "pyspark.sql.streaming",
         "pyspark.sql.udf",
         "pyspark.sql.window",
+        "pyspark.sql.avro.functions",
         # unittests
         "pyspark.sql.tests.test_appsubmit",
         "pyspark.sql.tests.test_arrow",
diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index afb91ae..40d53fb 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -139,6 +139,37 @@ StreamingQuery query = output
 
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+from pyspark.sql.avro.functions import from_avro, to_avro
+
+# `from_avro` requires Avro schema in JSON string format.
+jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
+
+df = spark\
+  .readStream\
+  .format("kafka")\
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
+  .option("subscribe", "topic1")\
+  .load()
+
+# 1. Decode the Avro data into a struct;
+# 2. Filter by column `favorite_color`;
+# 3. Encode the column `name` in Avro format.
+output = df\
+  .select(from_avro("value", jsonFormatSchema).alias("user"))\
+  .where('user.favorite_color == "red"')\
+  .select(to_avro("user.name").alias("value"))
+
+query = output\
+  .writeStream\
+  .format("kafka")\
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
+  .option("topic", "topic2")\
+  .start()
+
+{% endhighlight %}
+</div>
 </div>
 
 ## Data Source Option
diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst
index 5c3b7e2..5da7b44 100644
--- a/python/docs/pyspark.sql.rst
+++ b/python/docs/pyspark.sql.rst
@@ -23,6 +23,12 @@ pyspark.sql.functions module
     :members:
     :undoc-members:
 
+pyspark.sql.avro.functions module
+---------------------------------
+.. automodule:: pyspark.sql.avro.functions
+    :members:
+    :undoc-members:
+
 pyspark.sql.streaming module
 ----------------------------
 .. automodule:: pyspark.sql.streaming
diff --git a/python/pyspark/sql/avro/__init__.py b/python/pyspark/sql/avro/__init__.py
new file mode 100644
index 0000000..8f01fbb
--- /dev/null
+++ b/python/pyspark/sql/avro/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ['functions']
diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py
new file mode 100644
index 0000000..81686df
--- /dev/null
+++ b/python/pyspark/sql/avro/functions.py
@@ -0,0 +1,134 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A collections of builtin avro functions
+"""
+
+
+from pyspark import since, SparkContext
+from pyspark.rdd import ignore_unicode_prefix
+from pyspark.sql.column import Column, _to_java_column
+from pyspark.util import _print_missing_jar
+
+
+@ignore_unicode_prefix
+@since(3.0)
+def from_avro(data, jsonFormatSchema, options={}):
+    """
+    Converts a binary column of avro format into its corresponding catalyst value. The specified
+    schema must match the read data, otherwise the behavior is undefined: it may fail or return
+    arbitrary result.
+
+    Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
+    application as per the deployment section of "Apache Avro Data Source Guide".
+
+    :param data: the binary column.
+    :param jsonFormatSchema: the avro schema in JSON string format.
+    :param options: options to control how the Avro record is parsed.
+
+    >>> from pyspark.sql import Row
+    >>> from pyspark.sql.avro.functions import from_avro, to_avro
+    >>> data = [(1, Row(name='Alice', age=2))]
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> avroDf = df.select(to_avro(df.value).alias("avro"))
+    >>> avroDf.collect()
+    [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
+    >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields":
+    ...     [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord",
+    ...     "fields":[{"name":"age","type":["long","null"]},
+    ...     {"name":"name","type":["string","null"]}]},"null"]}]}'''
+    >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect()
+    [Row(value=Row(avro=Row(age=2, name=u'Alice')))]
+    """
+
+    sc = SparkContext._active_spark_context
+    try:
+        jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(
+            _to_java_column(data), jsonFormatSchema, options)
+    except TypeError as e:
+        if str(e) == "'JavaPackage' object is not callable":
+            _print_missing_jar("Avro", "avro", "avro", sc.version)
+        raise
+    return Column(jc)
+
+
+@ignore_unicode_prefix
+@since(3.0)
+def to_avro(data):
+    """
+    Converts a column into binary of avro format.
+
+    Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
+    application as per the deployment section of "Apache Avro Data Source Guide".
+
+    :param data: the data column.
+
+    >>> from pyspark.sql import Row
+    >>> from pyspark.sql.avro.functions import to_avro
+    >>> data = [(1, Row(name='Alice', age=2))]
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(to_avro(df.value).alias("avro")).collect()
+    [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
+    """
+
+    sc = SparkContext._active_spark_context
+    try:
+        jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data))
+    except TypeError as e:
+        if str(e) == "'JavaPackage' object is not callable":
+            _print_missing_jar("Avro", "avro", "avro", sc.version)
+        raise
+    return Column(jc)
+
+
+def _test():
+    import os
+    import sys
+    from pyspark.testing.utils import search_jar
+    avro_jar = search_jar("external/avro", "spark-avro")
+    if avro_jar is None:
+        print(
+            "Skipping all Avro Python tests as the optional Avro project was "
+            "not compiled into a JAR. To run these tests, "
+            "you need to build Spark with 'build/sbt -Pavro package' or "
+            "'build/mvn -Pavro package' before running this test.")
+        sys.exit(0)
+    else:
+        existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
+        jars_args = "--jars %s" % avro_jar
+        os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
+
+    import doctest
+    from pyspark.sql import Row, SparkSession
+    import pyspark.sql.avro.functions
+    globs = pyspark.sql.avro.functions.__dict__.copy()
+    spark = SparkSession.builder\
+        .master("local[4]")\
+        .appName("sql.avro.functions tests")\
+        .getOrCreate()
+    globs['spark'] = spark
+    (failure_count, test_count) = doctest.testmod(
+        pyspark.sql.avro.functions, globs=globs,
+        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
+    spark.stop()
+    if failure_count:
+        sys.exit(-1)
+
+
+if __name__ == "__main__":
+    _test()
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py
index b334882..4ed9f2a 100644
--- a/python/pyspark/streaming/kinesis.py
+++ b/python/pyspark/streaming/kinesis.py
@@ -18,6 +18,8 @@
 from pyspark.serializers import NoOpSerializer
 from pyspark.storagelevel import StorageLevel
 from pyspark.streaming import DStream
+from pyspark.util import _print_missing_jar
+
 
 __all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder']
 
@@ -82,7 +84,11 @@ class KinesisUtils(object):
             helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper()
         except TypeError as e:
             if str(e) == "'JavaPackage' object is not callable":
-                KinesisUtils._printErrorMsg(ssc.sparkContext)
+                _print_missing_jar(
+                    "Streaming's Kinesis",
+                    "streaming-kinesis-asl",
+                    "streaming-kinesis-asl-assembly",
+                    ssc.sparkContext.version)
             raise
         jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
                                       regionName, initialPositionInStream, jduration, jlevel,
@@ -91,28 +97,6 @@ class KinesisUtils(object):
         stream = DStream(jstream, ssc, NoOpSerializer())
         return stream.map(lambda v: decoder(v))
 
-    @staticmethod
-    def _printErrorMsg(sc):
-        print("""
-________________________________________________________________________________________________
-
-  Spark Streaming's Kinesis libraries not found in class path. Try one of the following.
-
-  1. Include the Kinesis library and its dependencies with in the
-     spark-submit command as
-
-     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ...
-
-  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
-     Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s.
-     Then, include the jar in the spark-submit command as
-
-     $ bin/spark-submit --jars <spark-streaming-kinesis-asl-assembly.jar> ...
-
-________________________________________________________________________________________________
-
-""" % (sc.version, sc.version))
-
 
 class InitialPositionInStream(object):
     LATEST, TRIM_HORIZON = (0, 1)
diff --git a/python/pyspark/testing/streamingutils.py b/python/pyspark/testing/streamingutils.py
index 85a2fa1..3bed507 100644
--- a/python/pyspark/testing/streamingutils.py
+++ b/python/pyspark/testing/streamingutils.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import glob
 import os
 import tempfile
 import time
@@ -22,32 +21,7 @@ import unittest
 
 from pyspark import SparkConf, SparkContext, RDD
 from pyspark.streaming import StreamingContext
-
-
-def search_kinesis_asl_assembly_jar():
-    kinesis_asl_assembly_dir = os.path.join(
-        os.environ["SPARK_HOME"], "external/kinesis-asl-assembly")
-
-    # We should ignore the following jars
-    ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")
-
-    # Search jar in the project dir using the jar name_prefix for both sbt build and maven
-    # build because the artifact jars are in different directories.
-    name_prefix = "spark-streaming-kinesis-asl-assembly"
-    sbt_build = glob.glob(os.path.join(
-        kinesis_asl_assembly_dir, "target/scala-*/%s-*.jar" % name_prefix))
-    maven_build = glob.glob(os.path.join(
-        kinesis_asl_assembly_dir, "target/%s_*.jar" % name_prefix))
-    jar_paths = sbt_build + maven_build
-    jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)]
-
-    if not jars:
-        return None
-    elif len(jars) > 1:
-        raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please "
-                         "remove all but one") % (", ".join(jars)))
-    else:
-        return jars[0]
+from pyspark.testing.utils import search_jar
 
 
 # Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py
@@ -59,7 +33,8 @@ if should_skip_kinesis_tests:
         "Skipping all Kinesis Python tests as environmental variable 'ENABLE_KINESIS_TESTS' "
         "was not set.")
 else:
-    kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
+    kinesis_asl_assembly_jar = search_jar("external/kinesis-asl-assembly",
+                                          "spark-streaming-kinesis-asl-assembly")
     if kinesis_asl_assembly_jar is None:
         kinesis_requirement_message = (
             "Skipping all Kinesis Python tests as the optional Kinesis project was "
diff --git a/python/pyspark/testing/utils.py b/python/pyspark/testing/utils.py
index 7df0aca..c6a5281 100644
--- a/python/pyspark/testing/utils.py
+++ b/python/pyspark/testing/utils.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import glob
 import os
 import struct
 import sys
@@ -100,3 +101,27 @@ class ByteArrayOutput(object):
 
     def close(self):
         pass
+
+
+def search_jar(project_relative_path, jar_name_prefix):
+    project_full_path = os.path.join(
+        os.environ["SPARK_HOME"], project_relative_path)
+
+    # We should ignore the following jars
+    ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")
+
+    # Search jar in the project dir using the jar name_prefix for both sbt build and maven
+    # build because the artifact jars are in different directories.
+    sbt_build = glob.glob(os.path.join(
+        project_full_path, "target/scala-*/%s*.jar" % jar_name_prefix))
+    maven_build = glob.glob(os.path.join(
+        project_full_path, "target/%s*.jar" % jar_name_prefix))
+    jar_paths = sbt_build + maven_build
+    jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)]
+
+    if not jars:
+        return None
+    elif len(jars) > 1:
+        raise Exception("Found multiple JARs: %s; please remove all but one" % (", ".join(jars)))
+    else:
+        return jars[0]
diff --git a/python/pyspark/util.py b/python/pyspark/util.py
index f906f49..d0ecd43 100644
--- a/python/pyspark/util.py
+++ b/python/pyspark/util.py
@@ -106,6 +106,33 @@ def fail_on_stopiteration(f):
     return wrapper
 
 
+def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version):
+    print("""
+________________________________________________________________________________________________
+
+  Spark %(lib_name)s libraries not found in class path. Try one of the following.
+
+  1. Include the %(lib_name)s library and its dependencies with in the
+     spark-submit command as
+
+     $ bin/spark-submit --packages org.apache.spark:spark-%(pkg_name)s:%(spark_version)s ...
+
+  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+     Group Id = org.apache.spark, Artifact Id = spark-%(jar_name)s, Version = %(spark_version)s.
+     Then, include the jar in the spark-submit command as
+
+     $ bin/spark-submit --jars <spark-%(jar_name)s.jar> ...
+
+________________________________________________________________________________________________
+
+""" % {
+        "lib_name": lib_name,
+        "pkg_name": pkg_name,
+        "jar_name": jar_name,
+        "spark_version": spark_version
+    })
+
+
 if __name__ == "__main__":
     import doctest
     (failure_count, test_count) = doctest.testmod()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org