You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/03/10 00:52:02 UTC
[spark] branch branch-3.4 updated: [SPARK-42733][CONNECT][PYTHON] Fix DataFrameWriter.save to work without path parameter
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 7e4f8703900 [SPARK-42733][CONNECT][PYTHON] Fix DataFrameWriter.save to work without path parameter
7e4f8703900 is described below
commit 7e4f8703900a984be5fd118737eaa31e8ce445b1
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Fri Mar 10 08:51:23 2023 +0800
[SPARK-42733][CONNECT][PYTHON] Fix DataFrameWriter.save to work without path parameter
### What changes were proposed in this pull request?
Fixes `DataFrameWriter.save` to work without path parameter.
### Why are the changes needed?
`DataFrameWriter.save` should work without path parameter because some data sources, such as jdbc, noop, works without those parameters.
```py
>>> print(spark.range(10).write.format("noop").mode("append").save())
Traceback (most recent call last):
...
AssertionError: Invalid configuration of WriteCommand, neither path or table present.
```
### Does this PR introduce _any_ user-facing change?
The data sources that don't need path parameter will work.
```py
>>> print(spark.range(10).write.format("noop").mode("append").save())
None
```
### How was this patch tested?
Added a test.
Closes #40356 from ueshin/issues/SPARK-42733/save.
Authored-by: Takuya UESHIN <ue...@databricks.com>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
(cherry picked from commit c2ee08baa98f21f664ec1966d63578346e4eebd8)
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
.../src/main/protobuf/spark/connect/commands.proto | 6 +++++-
.../sql/connect/planner/SparkConnectPlanner.scala | 1 +
.../sql/connect/planner/SparkConnectProtoSuite.scala | 3 ++-
python/pyspark/sql/connect/plan.py | 4 ----
python/pyspark/sql/tests/connect/test_connect_plan.py | 7 ++++---
python/pyspark/sql/tests/test_datasources.py | 18 ++++++++++++++++++
6 files changed, 30 insertions(+), 9 deletions(-)
diff --git a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index 5c0718aefeb..3ffbe83bded 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -83,7 +83,11 @@ message WriteOperation {
// (Optional) Format value according to the Spark documentation. Examples are: text, parquet, delta.
optional string source = 2;
- // The destination of the write operation must be either a path or a table.
+ // (Optional)
+ //
+ // The destination of the write operation can be either a path or a table.
+ // If the destination is neither a path nor a table, such as jdbc and noop,
+ // the `save_type` should not be set.
oneof save_type {
string path = 3;
SaveTable table = 4;
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 010d0236c74..f2478f548e7 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1723,6 +1723,7 @@ class SparkConnectPlanner(val session: SparkSession) {
}
writeOperation.getSaveTypeCase match {
+ case proto.WriteOperation.SaveTypeCase.SAVETYPE_NOT_SET => w.save()
case proto.WriteOperation.SaveTypeCase.PATH => w.save(writeOperation.getPath)
case proto.WriteOperation.SaveTypeCase.TABLE =>
val tableName = writeOperation.getTable.getTableName
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 241b3dcb825..00ff6ac2fb6 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -554,7 +554,8 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
parameters = Map("columnName" -> "`duplicatedcol`"))
}
- test("Writes fails without path or table") {
+ // TODO(SPARK-42733): Writes without path or table should work.
+ ignore("Writes fails without path or table") {
assertThrows[UnsupportedOperationException] {
transform(localRelation.write())
}
diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py
index 8e4db63dc7e..4e31811a9e2 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -1436,10 +1436,6 @@ class WriteOperation(LogicalPlan):
)
elif self.path is not None:
plan.write_operation.path = self.path
- else:
- raise AssertionError(
- "Invalid configuration of WriteCommand, neither path or table present."
- )
if self.mode is not None:
wm = self.mode.lower()
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan.py b/python/pyspark/sql/tests/connect/test_connect_plan.py
index 115d189b742..d5cffa459d7 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan.py
@@ -688,9 +688,10 @@ class SparkConnectPlanTests(PlanOnlyTestFixture):
wo.mode = "overwrite"
wo.source = "parquet"
- # Missing path or table name.
- with self.assertRaises(AssertionError):
- wo.command(None)
+ p = wo.command(None)
+ self.assertIsNotNone(p)
+ self.assertFalse(p.write_operation.HasField("path"))
+ self.assertFalse(p.write_operation.HasField("table"))
wo.path = "path"
p = wo.command(None)
diff --git a/python/pyspark/sql/tests/test_datasources.py b/python/pyspark/sql/tests/test_datasources.py
index 52880ae4631..9b6692a9d21 100644
--- a/python/pyspark/sql/tests/test_datasources.py
+++ b/python/pyspark/sql/tests/test_datasources.py
@@ -17,6 +17,7 @@
import shutil
import tempfile
+import uuid
from pyspark.sql import Row
from pyspark.sql.types import IntegerType, StructField, StructType, LongType, StringType
@@ -192,6 +193,23 @@ class DataSourcesTestsMixin:
finally:
shutil.rmtree(path)
+ def test_jdbc(self):
+ db = f"memory:{uuid.uuid4()}"
+ url = f"jdbc:derby:{db}"
+ dbtable = "test_table"
+
+ try:
+ df = self.spark.range(10)
+ df.write.format("jdbc").options(url=f"{url};create=true", dbtable=dbtable).save()
+ readback = self.spark.read.format("jdbc").options(url=url, dbtable=dbtable).load()
+ self.assertEqual(sorted(df.collect()), sorted(readback.collect()))
+ finally:
+ # Clean up.
+ with self.assertRaisesRegex(Exception, f"Database '{db}' dropped."):
+ self.spark.read.format("jdbc").options(
+ url=f"{url};drop=true", dbtable=dbtable
+ ).load().collect()
+
class DataSourcesTests(DataSourcesTestsMixin, ReusedSQLTestCase):
pass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org