You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/07/20 01:44:04 UTC

[spark] branch master updated: [SPARK-29157][SQL][PYSPARK] Add DataFrameWriterV2 to Python API

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ef3cad1  [SPARK-29157][SQL][PYSPARK] Add DataFrameWriterV2 to Python API
ef3cad1 is described below

commit ef3cad17a6c31707e537f3d7be9183fe26f1c60b
Author: zero323 <ms...@gmail.com>
AuthorDate: Mon Jul 20 10:42:33 2020 +0900

    [SPARK-29157][SQL][PYSPARK] Add DataFrameWriterV2 to Python API
    
    ### What changes were proposed in this pull request?
    
    - Adds `DataFramWriterV2` class.
    - Adds `writeTo` method to `pyspark.sql.DataFrame`.
    - Adds related SQL partitioning functions (`years`, `months`, ..., `bucket`).
    
    ### Why are the changes needed?
    
    Feature parity.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added new unit tests.
    
    TODO: Should we test against `org.apache.spark.sql.connector.InMemoryTableCatalog`? If so, how to expose it in Python tests?
    
    Closes #27331 from zero323/SPARK-29157.
    
    Authored-by: zero323 <ms...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 python/pyspark/sql/dataframe.py             |  18 +++-
 python/pyspark/sql/functions.py             | 112 ++++++++++++++++++++++
 python/pyspark/sql/readwriter.py            | 141 +++++++++++++++++++++++++++-
 python/pyspark/sql/tests/test_readwriter.py |  36 +++++++
 4 files changed, 305 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 023fbea..1027918 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -29,7 +29,7 @@ from pyspark.storagelevel import StorageLevel
 from pyspark.traceback_utils import SCCallSiteSync
 from pyspark.sql.types import _parse_datatype_json_string
 from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
-from pyspark.sql.readwriter import DataFrameWriter
+from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2
 from pyspark.sql.streaming import DataStreamWriter
 from pyspark.sql.types import *
 from pyspark.sql.pandas.conversion import PandasConversionMixin
@@ -2240,6 +2240,22 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         sinceversion=1.4,
         doc=":func:`drop_duplicates` is an alias for :func:`dropDuplicates`.")
 
+    @since(3.1)
+    def writeTo(self, table):
+        """
+        Create a write configuration builder for v2 sources.
+
+        This builder is used to configure and execute write operations.
+
+        For example, to append or create or replace existing tables.
+
+        >>> df.writeTo("catalog.db.table").append()  # doctest: +SKIP
+        >>> df.writeTo(                              # doctest: +SKIP
+        ...     "catalog.db.table"
+        ... ).partitionedBy("col").createOrReplace()
+        """
+        return DataFrameWriterV2(self, table)
+
 
 def _to_scala_map(sc, jm):
     """
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 5a35210..3ca4eda 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -3322,6 +3322,118 @@ def map_zip_with(col1, col2, f):
     return _invoke_higher_order_function("MapZipWith", [col1, col2], [f])
 
 
+# ---------------------- Partition transform functions --------------------------------
+
+@since(3.1)
+def years(col):
+    """
+    Partition transform function: A transform for timestamps and dates
+    to partition data into years.
+
+    >>> df.writeTo("catalog.db.table").partitionedBy(  # doctest: +SKIP
+    ...     years("ts")
+    ... ).createOrReplace()
+
+    .. warning::
+        This function can be used only in combinatiion with
+        :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+        method of the `DataFrameWriterV2`.
+
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.years(_to_java_column(col)))
+
+
+@since(3.1)
+def months(col):
+    """
+    Partition transform function: A transform for timestamps and dates
+    to partition data into months.
+
+    >>> df.writeTo("catalog.db.table").partitionedBy(
+    ...     months("ts")
+    ... ).createOrReplace()  # doctest: +SKIP
+
+    .. warning::
+        This function can be used only in combinatiion with
+        :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+        method of the `DataFrameWriterV2`.
+
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.months(_to_java_column(col)))
+
+
+@since(3.1)
+def days(col):
+    """
+    Partition transform function: A transform for timestamps and dates
+    to partition data into days.
+
+    >>> df.writeTo("catalog.db.table").partitionedBy(  # doctest: +SKIP
+    ...     days("ts")
+    ... ).createOrReplace()
+
+    .. warning::
+        This function can be used only in combinatiion with
+        :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+        method of the `DataFrameWriterV2`.
+
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.days(_to_java_column(col)))
+
+
+@since(3.1)
+def hours(col):
+    """
+    Partition transform function: A transform for timestamps
+    to partition data into hours.
+
+    >>> df.writeTo("catalog.db.table").partitionedBy(   # doctest: +SKIP
+    ...     hours("ts")
+    ... ).createOrReplace()
+
+    .. warning::
+        This function can be used only in combinatiion with
+        :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+        method of the `DataFrameWriterV2`.
+
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.hours(_to_java_column(col)))
+
+
+@since(3.1)
+def bucket(numBuckets, col):
+    """
+    Partition transform function: A transform for any type that partitions
+    by a hash of the input column.
+
+    >>> df.writeTo("catalog.db.table").partitionedBy(  # doctest: +SKIP
+    ...     bucket(42, "ts")
+    ... ).createOrReplace()
+
+    .. warning::
+        This function can be used only in combinatiion with
+        :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+        method of the `DataFrameWriterV2`.
+
+    """
+    if not isinstance(numBuckets, (int, Column)):
+        raise TypeError(
+            "numBuckets should be a Column or and int, got {}".format(type(numBuckets))
+        )
+
+    sc = SparkContext._active_spark_context
+    numBuckets = (
+        _create_column_from_literal(numBuckets)
+        if isinstance(numBuckets, int)
+        else _to_java_column(numBuckets)
+    )
+    return Column(sc._jvm.functions.bucket(numBuckets, _to_java_column(col)))
+
+
 # ---------------------------- User Defined Function ----------------------------------
 
 @since(1.3)
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index a83aece..6925adf 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -18,7 +18,7 @@
 from py4j.java_gateway import JavaClass
 
 from pyspark import RDD, since
-from pyspark.sql.column import _to_seq
+from pyspark.sql.column import _to_seq, _to_java_column
 from pyspark.sql.types import *
 from pyspark.sql import utils
 from pyspark.sql.utils import to_str
@@ -1075,6 +1075,145 @@ class DataFrameWriter(OptionUtils):
         self.mode(mode)._jwrite.jdbc(url, table, jprop)
 
 
+class DataFrameWriterV2(object):
+    """
+    Interface used to write a class:`pyspark.sql.dataframe.DataFrame`
+    to external storage using the v2 API.
+
+    .. versionadded:: 3.1.0
+    """
+
+    def __init__(self, df, table):
+        self._df = df
+        self._spark = df.sql_ctx
+        self._jwriter = df._jdf.writeTo(table)
+
+    @since(3.1)
+    def using(self, provider):
+        """
+        Specifies a provider for the underlying output data source.
+        Spark's default catalog supports "parquet", "json", etc.
+        """
+        self._jwriter.using(provider)
+        return self
+
+    @since(3.1)
+    def option(self, key, value):
+        """
+        Add a write option.
+        """
+        self._jwriter.option(key, to_str(value))
+        return self
+
+    @since(3.1)
+    def options(self, **options):
+        """
+        Add write options.
+        """
+        options = {k: to_str(v) for k, v in options.items()}
+        self._jwriter.options(options)
+        return self
+
+    @since(3.1)
+    def tableProperty(self, property, value):
+        """
+        Add table property.
+        """
+        self._jwriter.tableProperty(property, value)
+        return self
+
+    @since(3.1)
+    def partitionedBy(self, col, *cols):
+        """
+        Partition the output table created by `create`, `createOrReplace`, or `replace` using
+        the given columns or transforms.
+
+        When specified, the table data will be stored by these values for efficient reads.
+
+        For example, when a table is partitioned by day, it may be stored
+        in a directory layout like:
+
+        * `table/day=2019-06-01/`
+        * `table/day=2019-06-02/`
+
+        Partitioning is one of the most widely used techniques to optimize physical data layout.
+        It provides a coarse-grained index for skipping unnecessary data reads when queries have
+        predicates on the partitioned columns. In order for partitioning to work well, the number
+        of distinct values in each column should typically be less than tens of thousands.
+
+        `col` and `cols` support only the following functions:
+
+        * :py:func:`pyspark.sql.functions.years`
+        * :py:func:`pyspark.sql.functions.months`
+        * :py:func:`pyspark.sql.functions.days`
+        * :py:func:`pyspark.sql.functions.hours`
+        * :py:func:`pyspark.sql.functions.bucket`
+
+        """
+        col = _to_java_column(col)
+        cols = _to_seq(self._spark._sc, [_to_java_column(c) for c in cols])
+        return self
+
+    @since(3.1)
+    def create(self):
+        """
+        Create a new table from the contents of the data frame.
+
+        The new table's schema, partition layout, properties, and other configuration will be
+        based on the configuration set on this writer.
+        """
+        self._jwriter.create()
+
+    @since(3.1)
+    def replace(self):
+        """
+        Replace an existing table with the contents of the data frame.
+
+        The existing table's schema, partition layout, properties, and other configuration will be
+        replaced with the contents of the data frame and the configuration set on this writer.
+        """
+        self._jwriter.replace()
+
+    @since(3.1)
+    def createOrReplace(self):
+        """
+        Create a new table or replace an existing table with the contents of the data frame.
+
+        The output table's schema, partition layout, properties,
+        and other configuration will be based on the contents of the data frame
+        and the configuration set on this writer.
+        If the table exists, its configuration and data will be replaced.
+        """
+        self._jwriter.createOrReplace()
+
+    @since(3.1)
+    def append(self):
+        """
+        Append the contents of the data frame to the output table.
+        """
+        self._jwriter.append()
+
+    @since(3.1)
+    def overwrite(self, condition):
+        """
+        Overwrite rows matching the given filter condition with the contents of the data frame in
+        the output table.
+        """
+        condition = _to_java_column(column)
+        self._jwriter.overwrite(condition)
+
+    @since(3.1)
+    def overwritePartitions(self):
+        """
+        Overwrite all partition for which the data frame contains at least one row with the contents
+        of the data frame in the output table.
+
+        This operation is equivalent to Hive's `INSERT OVERWRITE ... PARTITION`, which replaces
+        partitions dynamically depending on the contents of the data frame.
+        """
+        self._jwriter.overwritePartitions()
+
+
 def _test():
     import doctest
     import os
diff --git a/python/pyspark/sql/tests/test_readwriter.py b/python/pyspark/sql/tests/test_readwriter.py
index 2530cc2..8e34d38 100644
--- a/python/pyspark/sql/tests/test_readwriter.py
+++ b/python/pyspark/sql/tests/test_readwriter.py
@@ -19,6 +19,8 @@ import os
 import shutil
 import tempfile
 
+from pyspark.sql.functions import col
+from pyspark.sql.readwriter import DataFrameWriterV2
 from pyspark.sql.types import *
 from pyspark.testing.sqlutils import ReusedSQLTestCase
 
@@ -163,6 +165,40 @@ class ReadwriterTests(ReusedSQLTestCase):
             self.assertEqual(6, self.spark.sql("select * from test_table").count())
 
 
+class ReadwriterV2Tests(ReusedSQLTestCase):
+    def test_api(self):
+        df = self.df
+        writer = df.writeTo("testcat.t")
+        self.assertIsInstance(writer, DataFrameWriterV2)
+        self.assertIsInstance(writer.option("property", "value"), DataFrameWriterV2)
+        self.assertIsInstance(writer.options(property="value"), DataFrameWriterV2)
+        self.assertIsInstance(writer.using("source"), DataFrameWriterV2)
+        self.assertIsInstance(writer.partitionedBy("id"), DataFrameWriterV2)
+        self.assertIsInstance(writer.partitionedBy(col("id")), DataFrameWriterV2)
+        self.assertIsInstance(writer.tableProperty("foo", "bar"), DataFrameWriterV2)
+
+    def test_partitioning_functions(self):
+        import datetime
+        from pyspark.sql.functions import years, months, days, hours, bucket
+
+        df = self.spark.createDataFrame(
+            [(1, datetime.datetime(2000, 1, 1), "foo")],
+            ("id", "ts", "value")
+        )
+
+        writer = df.writeTo("testcat.t")
+
+        self.assertIsInstance(writer.partitionedBy(years("ts")), DataFrameWriterV2)
+        self.assertIsInstance(writer.partitionedBy(months("ts")), DataFrameWriterV2)
+        self.assertIsInstance(writer.partitionedBy(days("ts")), DataFrameWriterV2)
+        self.assertIsInstance(writer.partitionedBy(hours("ts")), DataFrameWriterV2)
+        self.assertIsInstance(writer.partitionedBy(bucket(11, "id")), DataFrameWriterV2)
+        self.assertIsInstance(writer.partitionedBy(bucket(11, col("id"))), DataFrameWriterV2)
+        self.assertIsInstance(
+            writer.partitionedBy(bucket(3, "id"), hours(col("ts"))), DataFrameWriterV2
+        )
+
+
 if __name__ == "__main__":
     import unittest
     from pyspark.sql.tests.test_readwriter import *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org