You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/03/14 09:31:43 UTC

[spark] branch master updated: [SPARK-42733][CONNECT][FOLLOWUP] Write without path or table

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 93334e29548 [SPARK-42733][CONNECT][FOLLOWUP] Write without path or table
93334e29548 is described below

commit 93334e295483a0ba66e22d8398512ad970a3ea80
Author: Zhen Li <zh...@users.noreply.github.com>
AuthorDate: Tue Mar 14 18:31:27 2023 +0900

    [SPARK-42733][CONNECT][FOLLOWUP] Write without path or table
    
    ### What changes were proposed in this pull request?
    Fixes `DataFrameWriter.save` to work without path or table parameter.
    Added support of jdbc method in the writer as it is one of the impl that does not contains a path or table.
    
    ### Why are the changes needed?
    DataFrameWriter.save should work without path parameter because some data sources, such as jdbc, noop, works without those parameters.
    The follow up fix for scala client of https://github.com/apache/spark/pull/40356
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit and E2E test
    
    Closes #40358 from zhenlineo/write-without-path-table.
    
    Authored-by: Zhen Li <zh...@users.noreply.github.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../org/apache/spark/sql/DataFrameWriter.scala     | 37 ++++++++++++++++++++--
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  | 21 ++++++++++++
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 28 ++++++++++++++++
 .../CheckConnectJvmClientCompatibility.scala       |  4 +--
 .../connect/planner/SparkConnectProtoSuite.scala   | 11 ++++---
 5 files changed, 92 insertions(+), 9 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 8434addec92..b9d1fefb105 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.util.Locale
+import java.util.{Locale, Properties}
 
 import scala.collection.JavaConverters._
 
@@ -228,7 +228,9 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
 
     // Set path or table
     f(builder)
-    require(builder.hasPath != builder.hasTable) // Only one can be set
+
+    // Cannot both be set
+    require(!(builder.hasPath && builder.hasTable))
 
     builder.setMode(mode match {
       case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
@@ -345,6 +347,37 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
     })
   }
 
+  /**
+   * Saves the content of the `DataFrame` to an external database table via JDBC. In the case the
+   * table already exists in the external database, behavior of this function depends on the save
+   * mode, specified by the `mode` function (default to throwing an exception).
+   *
+   * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
+   * your external database systems.
+   *
+   * JDBC-specific option and parameter documentation for storing tables via JDBC in <a
+   * href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
+   * Data Source Option</a> in the version you use.
+   *
+   * @param table
+   *   Name of the table in the external database.
+   * @param connectionProperties
+   *   JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least
+   *   a "user" and "password" property should be included. "batchsize" can be used to control the
+   *   number of rows per insert. "isolationLevel" can be one of "NONE", "READ_COMMITTED",
+   *   "READ_UNCOMMITTED", "REPEATABLE_READ", or "SERIALIZABLE", corresponding to standard
+   *   transaction isolation levels defined by JDBC's Connection object, with default of
+   *   "READ_UNCOMMITTED".
+   * @since 3.4.0
+   */
+  def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
+    // connectionProperties should override settings in extraOptions.
+    this.extraOptions ++= connectionProperties.asScala
+    // explicit url and dbtable should override all
+    this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
+    format("jdbc").save()
+  }
+
   /**
    * Saves the content of the `DataFrame` in JSON format (<a href="http://jsonlines.org/"> JSON
    * Lines text format or newline-delimited JSON</a>) at the specified path. This is equivalent
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 60bb23516b0..5aa5500116d 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -22,6 +22,7 @@ import java.nio.file.Files
 import scala.collection.JavaConverters._
 
 import io.grpc.StatusRuntimeException
+import java.util.Properties
 import org.apache.commons.io.FileUtils
 import org.apache.commons.io.output.TeeOutputStream
 import org.scalactic.TolerantNumerics
@@ -175,6 +176,26 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper {
     }
   }
 
+  test("write without table or path") {
+    // Should receive no error to write noop
+    spark.range(10).write.format("noop").mode("append").save()
+  }
+
+  test("write jdbc") {
+    val url = "jdbc:derby:memory:1234"
+    val table = "t1"
+    try {
+      spark.range(10).write.jdbc(url = s"$url;create=true", table, new Properties())
+      val result = spark.read.jdbc(url = url, table, new Properties()).collect()
+      assert(result.length == 10)
+    } finally {
+      // clean up
+      assertThrows[StatusRuntimeException] {
+        spark.read.jdbc(url = s"$url;drop=true", table, new Properties()).collect()
+      }
+    }
+  }
+
   test("writeTo with create and using") {
     // TODO (SPARK-42519): Add more test after we can set configs. See more WriteTo test cases
     //  in SparkConnectProtoSuite.
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 42376db880b..e5738fe7acd 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import io.grpc.Server
 import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import java.util.Properties
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.connect.proto
@@ -100,6 +101,33 @@ class DatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {
     assert(actualPlan.equals(expectedPlan))
   }
 
+  test("write jdbc") {
+    val df = ss.newDataFrame(_ => ()).limit(10)
+
+    val builder = proto.WriteOperation.newBuilder()
+    builder
+      .setInput(df.plan.getRoot)
+      .setMode(proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS)
+      .setSource("jdbc")
+      .putOptions("a", "b")
+      .putOptions("1", "2")
+      .putOptions("url", "url")
+      .putOptions("dbtable", "table")
+
+    val expectedPlan = proto.Plan
+      .newBuilder()
+      .setCommand(proto.Command.newBuilder().setWriteOperation(builder))
+      .build()
+
+    val connectionProperties = new Properties
+    connectionProperties.put("a", "b")
+    connectionProperties.put("1", "2")
+    df.write.jdbc("url", "table", connectionProperties)
+
+    val actualPlan = service.getAndClearLatestInputPlan()
+    assert(actualPlan.equals(expectedPlan))
+  }
+
   test("write V2") {
     val df = ss.newDataFrame(_ => ()).limit(10)
 
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index ae6c6c86fec..97d130421a2 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -130,9 +130,7 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
 
       // DataFrame Reader & Writer
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"),
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), // deprecated
 
       // DataFrameNaFunctions
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameNaFunctions.this"),
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 9cc714d630b..824ee7aceb4 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 
 import com.google.protobuf.ByteString
 
-import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.{SparkClassNotFoundException, SparkIllegalArgumentException}
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.Expression
 import org.apache.spark.connect.proto.Join.JoinType
@@ -554,13 +554,16 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
       parameters = Map("columnName" -> "`duplicatedcol`"))
   }
 
-  // TODO(SPARK-42733): Writes without path or table should work.
-  ignore("Writes fails without path or table") {
-    assertThrows[UnsupportedOperationException] {
+  test("Writes fails without path or table") {
+    assertThrows[SparkIllegalArgumentException] {
       transform(localRelation.write())
     }
   }
 
+  test("Writes without path or table") {
+    transform(localRelation.write(format = Some("noop"), mode = Some("Append")))
+  }
+
   test("Write fails with unknown table - AnalysisException") {
     val cmd = readRel.write(tableName = Some("dest"))
     assertThrows[AnalysisException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org