You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/24 03:45:32 UTC
[spark] branch branch-3.1 updated: [SPARK-33659][SS] Document the
current behavior for DataStreamWriter.toTable API
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new ed9749b [SPARK-33659][SS] Document the current behavior for DataStreamWriter.toTable API
ed9749b is described below
commit ed9749b728e373d05f84ebd6c6b81b627657cebb
Author: Yuanjian Li <yu...@databricks.com>
AuthorDate: Thu Dec 24 12:44:37 2020 +0900
[SPARK-33659][SS] Document the current behavior for DataStreamWriter.toTable API
### What changes were proposed in this pull request?
Follow up work for #30521, document the following behaviors in the API doc:
- Figure out the effects when configurations are (provider/partitionBy) conflicting with the existing table.
- Document the lack of functionality on creating a v2 table, and guide that the users should ensure a table is created in prior to avoid the behavior unintended/insufficient table is being created.
### Why are the changes needed?
We didn't have full support for the V2 table created in the API now. (TODO SPARK-33638)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Document only.
Closes #30885 from xuanyuanking/SPARK-33659.
Authored-by: Yuanjian Li <yu...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 86c1cfc5791dae5f2ee8ccd5095dbeb2243baba6)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
python/pyspark/sql/streaming.py | 13 ++++++++++---
.../org/apache/spark/sql/streaming/DataStreamWriter.scala | 14 ++++++++++++--
.../spark/sql/streaming/test/DataStreamTableAPISuite.scala | 6 +++---
3 files changed, 25 insertions(+), 8 deletions(-)
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 5f12229..51941a6 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -1498,8 +1498,7 @@ class DataStreamWriter(object):
Starts the execution of the streaming query, which will continually output results to the
given table as new data arrives.
- A new table will be created if the table not exists. The returned
- :class:`StreamingQuery` object can be used to interact with the stream.
+ The returned :class:`StreamingQuery` object can be used to interact with the stream.
.. versionadded:: 3.1.0
@@ -1531,6 +1530,15 @@ class DataStreamWriter(object):
-----
This API is evolving.
+ For v1 table, partitioning columns provided by `partitionBy` will be respected no matter
+ the table exists or not. A new table will be created if the table not exists.
+
+ For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will
+ be respected only if the v2 table does not exist. Besides, the v2 table created by this API
+ lacks some functionalities (e.g., customized properties, options, and serde info). If you
+ need them, please create the v2 table manually before the execution to avoid creating a
+ table with incomplete information.
+
Examples
--------
>>> sdf.writeStream.format('parquet').queryName('query').toTable('output_table')
@@ -1543,7 +1551,6 @@ class DataStreamWriter(object):
... format='parquet',
... checkpointLocation='/tmp/checkpoint') # doctest: +SKIP
"""
- # TODO(SPARK-33659): document the current behavior for DataStreamWriter.toTable API
self.options(**options)
if outputMode is not None:
self.outputMode(outputMode)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 9e8dff3..46ab1de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -302,11 +302,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
/**
* Starts the execution of the streaming query, which will continually output results to the given
- * table as new data arrives. A new table will be created if the table not exists. The returned
- * [[StreamingQuery]] object can be used to interact with the stream.
+ * table as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
+ * the stream.
+ *
+ * For v1 table, partitioning columns provided by `partitionBy` will be respected no matter the
+ * table exists or not. A new table will be created if the table not exists.
+ *
+ * For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will be
+ * respected only if the v2 table does not exist. Besides, the v2 table created by this API lacks
+ * some functionalities (e.g., customized properties, options, and serde info). If you need them,
+ * please create the v2 table manually before the execution to avoid creating a table with
+ * incomplete information.
*
* @since 3.1.0
*/
+ @Evolving
@throws[TimeoutException]
def toTable(tableName: String): StreamingQuery = {
this.tableName = tableName
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 9cf6496..4c5c5e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -275,7 +275,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
- // query. This is because we loads files from the metadata log instead of listing them
+ // query. This is because we load files from the metadata log instead of listing them
// using HDFS API.
Seq(4, 5, 6).toDF("value").write.format("parquet")
.option("path", dir.getCanonicalPath).saveAsTable(tableName)
@@ -289,7 +289,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
- // query. This is because we loads files from the metadata log instead of listing them
+ // query. This is because we load files from the metadata log instead of listing them
// using HDFS API.
Seq(4, 5, 6).toDF("value").write.format("parquet").saveAsTable(tableName)
@@ -302,7 +302,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
- // query. This is because we loads files from the metadata log instead of listing them
+ // query. This is because we load files from the metadata log instead of listing them
// using HDFS API.
Seq(4, 5, 6).toDF("value").write
.mode("append").format("parquet").save(dir.getCanonicalPath)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org