You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/24 03:45:32 UTC

[spark] branch branch-3.1 updated: [SPARK-33659][SS] Document the current behavior for DataStreamWriter.toTable API

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new ed9749b  [SPARK-33659][SS] Document the current behavior for DataStreamWriter.toTable API
ed9749b is described below

commit ed9749b728e373d05f84ebd6c6b81b627657cebb
Author: Yuanjian Li <yu...@databricks.com>
AuthorDate: Thu Dec 24 12:44:37 2020 +0900

    [SPARK-33659][SS] Document the current behavior for DataStreamWriter.toTable API
    
    ### What changes were proposed in this pull request?
    Follow up work for #30521, document the following behaviors in the API doc:
    
    - Figure out the effects when configurations are (provider/partitionBy) conflicting with the existing table.
    - Document the lack of functionality on creating a v2 table, and guide that the users should ensure a table is created in prior to avoid the behavior unintended/insufficient table is being created.
    
    ### Why are the changes needed?
    We didn't have full support for the V2 table created in the API now. (TODO SPARK-33638)
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Document only.
    
    Closes #30885 from xuanyuanking/SPARK-33659.
    
    Authored-by: Yuanjian Li <yu...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 86c1cfc5791dae5f2ee8ccd5095dbeb2243baba6)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 python/pyspark/sql/streaming.py                            | 13 ++++++++++---
 .../org/apache/spark/sql/streaming/DataStreamWriter.scala  | 14 ++++++++++++--
 .../spark/sql/streaming/test/DataStreamTableAPISuite.scala |  6 +++---
 3 files changed, 25 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 5f12229..51941a6 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -1498,8 +1498,7 @@ class DataStreamWriter(object):
         Starts the execution of the streaming query, which will continually output results to the
         given table as new data arrives.
 
-        A new table will be created if the table not exists. The returned
-        :class:`StreamingQuery` object can be used to interact with the stream.
+        The returned :class:`StreamingQuery` object can be used to interact with the stream.
 
         .. versionadded:: 3.1.0
 
@@ -1531,6 +1530,15 @@ class DataStreamWriter(object):
         -----
         This API is evolving.
 
+        For v1 table, partitioning columns provided by `partitionBy` will be respected no matter
+        the table exists or not. A new table will be created if the table not exists.
+
+        For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will
+        be respected only if the v2 table does not exist. Besides, the v2 table created by this API
+        lacks some functionalities (e.g., customized properties, options, and serde info). If you
+        need them, please create the v2 table manually before the execution to avoid creating a
+        table with incomplete information.
+
         Examples
         --------
         >>> sdf.writeStream.format('parquet').queryName('query').toTable('output_table')
@@ -1543,7 +1551,6 @@ class DataStreamWriter(object):
         ...     format='parquet',
         ...     checkpointLocation='/tmp/checkpoint') # doctest: +SKIP
         """
-        # TODO(SPARK-33659): document the current behavior for DataStreamWriter.toTable API
         self.options(**options)
         if outputMode is not None:
             self.outputMode(outputMode)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 9e8dff3..46ab1de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -302,11 +302,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   /**
    * Starts the execution of the streaming query, which will continually output results to the given
-   * table as new data arrives. A new table will be created if the table not exists. The returned
-   * [[StreamingQuery]] object can be used to interact with the stream.
+   * table as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
+   * the stream.
+   *
+   * For v1 table, partitioning columns provided by `partitionBy` will be respected no matter the
+   * table exists or not. A new table will be created if the table not exists.
+   *
+   * For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will be
+   * respected only if the v2 table does not exist. Besides, the v2 table created by this API lacks
+   * some functionalities (e.g., customized properties, options, and serde info). If you need them,
+   * please create the v2 table manually before the execution to avoid creating a table with
+   * incomplete information.
    *
    * @since 3.1.0
    */
+  @Evolving
   @throws[TimeoutException]
   def toTable(tableName: String): StreamingQuery = {
     this.tableName = tableName
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 9cf6496..4c5c5e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -275,7 +275,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
       val tableName = "stream_test"
       withTable(tableName) {
         // The file written by batch will not be seen after the table was written by a streaming
-        // query. This is because we loads files from the metadata log instead of listing them
+        // query. This is because we load files from the metadata log instead of listing them
         // using HDFS API.
         Seq(4, 5, 6).toDF("value").write.format("parquet")
           .option("path", dir.getCanonicalPath).saveAsTable(tableName)
@@ -289,7 +289,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
     val tableName = "stream_test"
     withTable(tableName) {
       // The file written by batch will not be seen after the table was written by a streaming
-      // query. This is because we loads files from the metadata log instead of listing them
+      // query. This is because we load files from the metadata log instead of listing them
       // using HDFS API.
       Seq(4, 5, 6).toDF("value").write.format("parquet").saveAsTable(tableName)
 
@@ -302,7 +302,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
       val tableName = "stream_test"
       withTable(tableName) {
         // The file written by batch will not be seen after the table was written by a streaming
-        // query. This is because we loads files from the metadata log instead of listing them
+        // query. This is because we load files from the metadata log instead of listing them
         // using HDFS API.
         Seq(4, 5, 6).toDF("value").write
           .mode("append").format("parquet").save(dir.getCanonicalPath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org