You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/03/10 00:52:02 UTC

[spark] branch branch-3.4 updated: [SPARK-42733][CONNECT][PYTHON] Fix DataFrameWriter.save to work without path parameter

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 7e4f8703900 [SPARK-42733][CONNECT][PYTHON] Fix DataFrameWriter.save to work without path parameter
7e4f8703900 is described below

commit 7e4f8703900a984be5fd118737eaa31e8ce445b1
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Fri Mar 10 08:51:23 2023 +0800

    [SPARK-42733][CONNECT][PYTHON] Fix DataFrameWriter.save to work without path parameter
    
    ### What changes were proposed in this pull request?
    
    Fixes `DataFrameWriter.save` to work without path parameter.
    
    ### Why are the changes needed?
    
    `DataFrameWriter.save` should work without path parameter because some data sources, such as jdbc, noop, works without those parameters.
    
    ```py
    >>> print(spark.range(10).write.format("noop").mode("append").save())
    Traceback (most recent call last):
    ...
    AssertionError: Invalid configuration of WriteCommand, neither path or table present.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    The data sources that don't need path parameter will work.
    
    ```py
    >>> print(spark.range(10).write.format("noop").mode("append").save())
    None
    ```
    
    ### How was this patch tested?
    
    Added a test.
    
    Closes #40356 from ueshin/issues/SPARK-42733/save.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
    (cherry picked from commit c2ee08baa98f21f664ec1966d63578346e4eebd8)
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 .../src/main/protobuf/spark/connect/commands.proto     |  6 +++++-
 .../sql/connect/planner/SparkConnectPlanner.scala      |  1 +
 .../sql/connect/planner/SparkConnectProtoSuite.scala   |  3 ++-
 python/pyspark/sql/connect/plan.py                     |  4 ----
 python/pyspark/sql/tests/connect/test_connect_plan.py  |  7 ++++---
 python/pyspark/sql/tests/test_datasources.py           | 18 ++++++++++++++++++
 6 files changed, 30 insertions(+), 9 deletions(-)

diff --git a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index 5c0718aefeb..3ffbe83bded 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -83,7 +83,11 @@ message WriteOperation {
   // (Optional) Format value according to the Spark documentation. Examples are: text, parquet, delta.
   optional string source = 2;
 
-  // The destination of the write operation must be either a path or a table.
+  // (Optional)
+  //
+  // The destination of the write operation can be either a path or a table.
+  // If the destination is neither a path nor a table, such as jdbc and noop,
+  // the `save_type` should not be set.
   oneof save_type {
     string path = 3;
     SaveTable table = 4;
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 010d0236c74..f2478f548e7 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1723,6 +1723,7 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
 
     writeOperation.getSaveTypeCase match {
+      case proto.WriteOperation.SaveTypeCase.SAVETYPE_NOT_SET => w.save()
       case proto.WriteOperation.SaveTypeCase.PATH => w.save(writeOperation.getPath)
       case proto.WriteOperation.SaveTypeCase.TABLE =>
         val tableName = writeOperation.getTable.getTableName
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 241b3dcb825..00ff6ac2fb6 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -554,7 +554,8 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
       parameters = Map("columnName" -> "`duplicatedcol`"))
   }
 
-  test("Writes fails without path or table") {
+  // TODO(SPARK-42733): Writes without path or table should work.
+  ignore("Writes fails without path or table") {
     assertThrows[UnsupportedOperationException] {
       transform(localRelation.write())
     }
diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py
index 8e4db63dc7e..4e31811a9e2 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -1436,10 +1436,6 @@ class WriteOperation(LogicalPlan):
                     )
         elif self.path is not None:
             plan.write_operation.path = self.path
-        else:
-            raise AssertionError(
-                "Invalid configuration of WriteCommand, neither path or table present."
-            )
 
         if self.mode is not None:
             wm = self.mode.lower()
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan.py b/python/pyspark/sql/tests/connect/test_connect_plan.py
index 115d189b742..d5cffa459d7 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan.py
@@ -688,9 +688,10 @@ class SparkConnectPlanTests(PlanOnlyTestFixture):
         wo.mode = "overwrite"
         wo.source = "parquet"
 
-        # Missing path or table name.
-        with self.assertRaises(AssertionError):
-            wo.command(None)
+        p = wo.command(None)
+        self.assertIsNotNone(p)
+        self.assertFalse(p.write_operation.HasField("path"))
+        self.assertFalse(p.write_operation.HasField("table"))
 
         wo.path = "path"
         p = wo.command(None)
diff --git a/python/pyspark/sql/tests/test_datasources.py b/python/pyspark/sql/tests/test_datasources.py
index 52880ae4631..9b6692a9d21 100644
--- a/python/pyspark/sql/tests/test_datasources.py
+++ b/python/pyspark/sql/tests/test_datasources.py
@@ -17,6 +17,7 @@
 
 import shutil
 import tempfile
+import uuid
 
 from pyspark.sql import Row
 from pyspark.sql.types import IntegerType, StructField, StructType, LongType, StringType
@@ -192,6 +193,23 @@ class DataSourcesTestsMixin:
         finally:
             shutil.rmtree(path)
 
+    def test_jdbc(self):
+        db = f"memory:{uuid.uuid4()}"
+        url = f"jdbc:derby:{db}"
+        dbtable = "test_table"
+
+        try:
+            df = self.spark.range(10)
+            df.write.format("jdbc").options(url=f"{url};create=true", dbtable=dbtable).save()
+            readback = self.spark.read.format("jdbc").options(url=url, dbtable=dbtable).load()
+            self.assertEqual(sorted(df.collect()), sorted(readback.collect()))
+        finally:
+            # Clean up.
+            with self.assertRaisesRegex(Exception, f"Database '{db}' dropped."):
+                self.spark.read.format("jdbc").options(
+                    url=f"{url};drop=true", dbtable=dbtable
+                ).load().collect()
+
 
 class DataSourcesTests(DataSourcesTestsMixin, ReusedSQLTestCase):
     pass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org