You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/03/11 01:15:27 UTC
[spark] branch master updated: [SPARK-26856][PYSPARK] Python
support for from_avro and to_avro APIs
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3729efb [SPARK-26856][PYSPARK] Python support for from_avro and to_avro APIs
3729efb is described below
commit 3729efb4d0420700a396c79a83a1d5db25ac3bcb
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Mon Mar 11 10:15:07 2019 +0900
[SPARK-26856][PYSPARK] Python support for from_avro and to_avro APIs
## What changes were proposed in this pull request?
Avro is built-in but external data source module since Spark 2.4 but `from_avro` and `to_avro` APIs not yet supported in pyspark.
In this PR I've made them available from pyspark.
## How was this patch tested?
Please see the python API examples what I've added.
cd docs/
SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 jekyll build
Manual webpage check.
Closes #23797 from gaborgsomogyi/SPARK-26856.
Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
dev/sparktestsupport/modules.py | 3 +-
docs/sql-data-sources-avro.md | 31 +++++++
python/docs/pyspark.sql.rst | 6 ++
python/pyspark/sql/avro/__init__.py | 18 +++++
python/pyspark/sql/avro/functions.py | 134 +++++++++++++++++++++++++++++++
python/pyspark/streaming/kinesis.py | 30 ++-----
python/pyspark/testing/streamingutils.py | 31 +------
python/pyspark/testing/utils.py | 25 ++++++
python/pyspark/util.py | 27 +++++++
9 files changed, 253 insertions(+), 52 deletions(-)
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index eef7f25..d496eec 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -341,7 +341,7 @@ pyspark_core = Module(
pyspark_sql = Module(
name="pyspark-sql",
- dependencies=[pyspark_core, hive],
+ dependencies=[pyspark_core, hive, avro],
source_file_regexes=[
"python/pyspark/sql"
],
@@ -360,6 +360,7 @@ pyspark_sql = Module(
"pyspark.sql.streaming",
"pyspark.sql.udf",
"pyspark.sql.window",
+ "pyspark.sql.avro.functions",
# unittests
"pyspark.sql.tests.test_appsubmit",
"pyspark.sql.tests.test_arrow",
diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index afb91ae..40d53fb 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -139,6 +139,37 @@ StreamingQuery query = output
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+from pyspark.sql.avro.functions import from_avro, to_avro
+
+# `from_avro` requires Avro schema in JSON string format.
+jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
+
+df = spark\
+ .readStream\
+ .format("kafka")\
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
+ .option("subscribe", "topic1")\
+ .load()
+
+# 1. Decode the Avro data into a struct;
+# 2. Filter by column `favorite_color`;
+# 3. Encode the column `name` in Avro format.
+output = df\
+ .select(from_avro("value", jsonFormatSchema).alias("user"))\
+ .where('user.favorite_color == "red"')\
+ .select(to_avro("user.name").alias("value"))
+
+query = output\
+ .writeStream\
+ .format("kafka")\
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
+ .option("topic", "topic2")\
+ .start()
+
+{% endhighlight %}
+</div>
</div>
## Data Source Option
diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst
index 5c3b7e2..5da7b44 100644
--- a/python/docs/pyspark.sql.rst
+++ b/python/docs/pyspark.sql.rst
@@ -23,6 +23,12 @@ pyspark.sql.functions module
:members:
:undoc-members:
+pyspark.sql.avro.functions module
+---------------------------------
+.. automodule:: pyspark.sql.avro.functions
+ :members:
+ :undoc-members:
+
pyspark.sql.streaming module
----------------------------
.. automodule:: pyspark.sql.streaming
diff --git a/python/pyspark/sql/avro/__init__.py b/python/pyspark/sql/avro/__init__.py
new file mode 100644
index 0000000..8f01fbb
--- /dev/null
+++ b/python/pyspark/sql/avro/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ['functions']
diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py
new file mode 100644
index 0000000..81686df
--- /dev/null
+++ b/python/pyspark/sql/avro/functions.py
@@ -0,0 +1,134 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A collections of builtin avro functions
+"""
+
+
+from pyspark import since, SparkContext
+from pyspark.rdd import ignore_unicode_prefix
+from pyspark.sql.column import Column, _to_java_column
+from pyspark.util import _print_missing_jar
+
+
+@ignore_unicode_prefix
+@since(3.0)
+def from_avro(data, jsonFormatSchema, options={}):
+ """
+ Converts a binary column of avro format into its corresponding catalyst value. The specified
+ schema must match the read data, otherwise the behavior is undefined: it may fail or return
+ arbitrary result.
+
+ Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
+ application as per the deployment section of "Apache Avro Data Source Guide".
+
+ :param data: the binary column.
+ :param jsonFormatSchema: the avro schema in JSON string format.
+ :param options: options to control how the Avro record is parsed.
+
+ >>> from pyspark.sql import Row
+ >>> from pyspark.sql.avro.functions import from_avro, to_avro
+ >>> data = [(1, Row(name='Alice', age=2))]
+ >>> df = spark.createDataFrame(data, ("key", "value"))
+ >>> avroDf = df.select(to_avro(df.value).alias("avro"))
+ >>> avroDf.collect()
+ [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
+ >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields":
+ ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord",
+ ... "fields":[{"name":"age","type":["long","null"]},
+ ... {"name":"name","type":["string","null"]}]},"null"]}]}'''
+ >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect()
+ [Row(value=Row(avro=Row(age=2, name=u'Alice')))]
+ """
+
+ sc = SparkContext._active_spark_context
+ try:
+ jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(
+ _to_java_column(data), jsonFormatSchema, options)
+ except TypeError as e:
+ if str(e) == "'JavaPackage' object is not callable":
+ _print_missing_jar("Avro", "avro", "avro", sc.version)
+ raise
+ return Column(jc)
+
+
+@ignore_unicode_prefix
+@since(3.0)
+def to_avro(data):
+ """
+ Converts a column into binary of avro format.
+
+ Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
+ application as per the deployment section of "Apache Avro Data Source Guide".
+
+ :param data: the data column.
+
+ >>> from pyspark.sql import Row
+ >>> from pyspark.sql.avro.functions import to_avro
+ >>> data = [(1, Row(name='Alice', age=2))]
+ >>> df = spark.createDataFrame(data, ("key", "value"))
+ >>> df.select(to_avro(df.value).alias("avro")).collect()
+ [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
+ """
+
+ sc = SparkContext._active_spark_context
+ try:
+ jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data))
+ except TypeError as e:
+ if str(e) == "'JavaPackage' object is not callable":
+ _print_missing_jar("Avro", "avro", "avro", sc.version)
+ raise
+ return Column(jc)
+
+
+def _test():
+ import os
+ import sys
+ from pyspark.testing.utils import search_jar
+ avro_jar = search_jar("external/avro", "spark-avro")
+ if avro_jar is None:
+ print(
+ "Skipping all Avro Python tests as the optional Avro project was "
+ "not compiled into a JAR. To run these tests, "
+ "you need to build Spark with 'build/sbt -Pavro package' or "
+ "'build/mvn -Pavro package' before running this test.")
+ sys.exit(0)
+ else:
+ existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
+ jars_args = "--jars %s" % avro_jar
+ os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
+
+ import doctest
+ from pyspark.sql import Row, SparkSession
+ import pyspark.sql.avro.functions
+ globs = pyspark.sql.avro.functions.__dict__.copy()
+ spark = SparkSession.builder\
+ .master("local[4]")\
+ .appName("sql.avro.functions tests")\
+ .getOrCreate()
+ globs['spark'] = spark
+ (failure_count, test_count) = doctest.testmod(
+ pyspark.sql.avro.functions, globs=globs,
+ optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
+ spark.stop()
+ if failure_count:
+ sys.exit(-1)
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py
index b334882..4ed9f2a 100644
--- a/python/pyspark/streaming/kinesis.py
+++ b/python/pyspark/streaming/kinesis.py
@@ -18,6 +18,8 @@
from pyspark.serializers import NoOpSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.streaming import DStream
+from pyspark.util import _print_missing_jar
+
__all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder']
@@ -82,7 +84,11 @@ class KinesisUtils(object):
helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
- KinesisUtils._printErrorMsg(ssc.sparkContext)
+ _print_missing_jar(
+ "Streaming's Kinesis",
+ "streaming-kinesis-asl",
+ "streaming-kinesis-asl-assembly",
+ ssc.sparkContext.version)
raise
jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
regionName, initialPositionInStream, jduration, jlevel,
@@ -91,28 +97,6 @@ class KinesisUtils(object):
stream = DStream(jstream, ssc, NoOpSerializer())
return stream.map(lambda v: decoder(v))
- @staticmethod
- def _printErrorMsg(sc):
- print("""
-________________________________________________________________________________________________
-
- Spark Streaming's Kinesis libraries not found in class path. Try one of the following.
-
- 1. Include the Kinesis library and its dependencies with in the
- spark-submit command as
-
- $ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ...
-
- 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
- Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s.
- Then, include the jar in the spark-submit command as
-
- $ bin/spark-submit --jars <spark-streaming-kinesis-asl-assembly.jar> ...
-
-________________________________________________________________________________________________
-
-""" % (sc.version, sc.version))
-
class InitialPositionInStream(object):
LATEST, TRIM_HORIZON = (0, 1)
diff --git a/python/pyspark/testing/streamingutils.py b/python/pyspark/testing/streamingutils.py
index 85a2fa1..3bed507 100644
--- a/python/pyspark/testing/streamingutils.py
+++ b/python/pyspark/testing/streamingutils.py
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import glob
import os
import tempfile
import time
@@ -22,32 +21,7 @@ import unittest
from pyspark import SparkConf, SparkContext, RDD
from pyspark.streaming import StreamingContext
-
-
-def search_kinesis_asl_assembly_jar():
- kinesis_asl_assembly_dir = os.path.join(
- os.environ["SPARK_HOME"], "external/kinesis-asl-assembly")
-
- # We should ignore the following jars
- ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")
-
- # Search jar in the project dir using the jar name_prefix for both sbt build and maven
- # build because the artifact jars are in different directories.
- name_prefix = "spark-streaming-kinesis-asl-assembly"
- sbt_build = glob.glob(os.path.join(
- kinesis_asl_assembly_dir, "target/scala-*/%s-*.jar" % name_prefix))
- maven_build = glob.glob(os.path.join(
- kinesis_asl_assembly_dir, "target/%s_*.jar" % name_prefix))
- jar_paths = sbt_build + maven_build
- jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)]
-
- if not jars:
- return None
- elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please "
- "remove all but one") % (", ".join(jars)))
- else:
- return jars[0]
+from pyspark.testing.utils import search_jar
# Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py
@@ -59,7 +33,8 @@ if should_skip_kinesis_tests:
"Skipping all Kinesis Python tests as environmental variable 'ENABLE_KINESIS_TESTS' "
"was not set.")
else:
- kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
+ kinesis_asl_assembly_jar = search_jar("external/kinesis-asl-assembly",
+ "spark-streaming-kinesis-asl-assembly")
if kinesis_asl_assembly_jar is None:
kinesis_requirement_message = (
"Skipping all Kinesis Python tests as the optional Kinesis project was "
diff --git a/python/pyspark/testing/utils.py b/python/pyspark/testing/utils.py
index 7df0aca..c6a5281 100644
--- a/python/pyspark/testing/utils.py
+++ b/python/pyspark/testing/utils.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import glob
import os
import struct
import sys
@@ -100,3 +101,27 @@ class ByteArrayOutput(object):
def close(self):
pass
+
+
+def search_jar(project_relative_path, jar_name_prefix):
+ project_full_path = os.path.join(
+ os.environ["SPARK_HOME"], project_relative_path)
+
+ # We should ignore the following jars
+ ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")
+
+ # Search jar in the project dir using the jar name_prefix for both sbt build and maven
+ # build because the artifact jars are in different directories.
+ sbt_build = glob.glob(os.path.join(
+ project_full_path, "target/scala-*/%s*.jar" % jar_name_prefix))
+ maven_build = glob.glob(os.path.join(
+ project_full_path, "target/%s*.jar" % jar_name_prefix))
+ jar_paths = sbt_build + maven_build
+ jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)]
+
+ if not jars:
+ return None
+ elif len(jars) > 1:
+ raise Exception("Found multiple JARs: %s; please remove all but one" % (", ".join(jars)))
+ else:
+ return jars[0]
diff --git a/python/pyspark/util.py b/python/pyspark/util.py
index f906f49..d0ecd43 100644
--- a/python/pyspark/util.py
+++ b/python/pyspark/util.py
@@ -106,6 +106,33 @@ def fail_on_stopiteration(f):
return wrapper
+def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version):
+ print("""
+________________________________________________________________________________________________
+
+ Spark %(lib_name)s libraries not found in class path. Try one of the following.
+
+ 1. Include the %(lib_name)s library and its dependencies with in the
+ spark-submit command as
+
+ $ bin/spark-submit --packages org.apache.spark:spark-%(pkg_name)s:%(spark_version)s ...
+
+ 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+ Group Id = org.apache.spark, Artifact Id = spark-%(jar_name)s, Version = %(spark_version)s.
+ Then, include the jar in the spark-submit command as
+
+ $ bin/spark-submit --jars <spark-%(jar_name)s.jar> ...
+
+________________________________________________________________________________________________
+
+""" % {
+ "lib_name": lib_name,
+ "pkg_name": pkg_name,
+ "jar_name": jar_name,
+ "spark_version": spark_version
+ })
+
+
if __name__ == "__main__":
import doctest
(failure_count, test_count) = doctest.testmod()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org