You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/04 02:59:59 UTC

[spark] branch branch-3.4 updated: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 0be6b2e6abb [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
0be6b2e6abb is described below

commit 0be6b2e6abb7ef9ce746be71db5f800fb49931a8
Author: Jiaan Geng <be...@163.com>
AuthorDate: Fri Mar 3 22:59:37 2023 -0400

    [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
    
    ### What changes were proposed in this pull request?
    Currently, the connect project have the new `DataFrameReader` API which is corresponding to Spark `DataFrameReader` API. But the connect `DataFrameReader` missing the jdbc API.
    
    ### Why are the changes needed?
    This PR try to add JDBC to `DataFrameReader`
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    New test cases.
    
    Closes #40252 from beliefer/SPARK-42555.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit 41c5e326eb4f45818c8227ab51e729c8402d995d)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../org/apache/spark/sql/DataFrameReader.scala     |  66 +++++++++++++++++++++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  20 ++++++-
 .../query-tests/explain-results/read_jdbc.explain  |   1 +
 .../read_jdbc_with_partition.explain               |   1 +
 .../resources/query-tests/queries/read_jdbc.json   |  14 +++++
 .../query-tests/queries/read_jdbc.proto.bin        | Bin 0 -> 101 bytes
 .../queries/read_jdbc_with_partition.json          |  18 ++++++
 .../queries/read_jdbc_with_partition.proto.bin     | Bin 0 -> 177 bytes
 .../sql/connect/ProtoToParsedPlanTestSuite.scala   |  33 +++++++++++
 9 files changed, 152 insertions(+), 1 deletion(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 3e17b03173b..43d6486f124 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import java.util.Properties
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Stable
@@ -184,6 +186,70 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging
     }
   }
 
+  /**
+   * Construct a `DataFrame` representing the database table accessible via JDBC URL url named
+   * table and connection properties.
+   *
+   * You can find the JDBC-specific option and parameter documentation for reading tables via JDBC
+   * in <a
+   * href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 3.4.0
+   */
+  def jdbc(url: String, table: String, properties: Properties): DataFrame = {
+    // properties should override settings in extraOptions.
+    this.extraOptions ++= properties.asScala
+    // explicit url and dbtable should override all
+    this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
+    format("jdbc").load()
+  }
+
+  // scalastyle:off line.size.limit
+  /**
+   * Construct a `DataFrame` representing the database table accessible via JDBC URL url named
+   * table. Partitions of the table will be retrieved in parallel based on the parameters passed
+   * to this function.
+   *
+   * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
+   * your external database systems.
+   *
+   * You can find the JDBC-specific option and parameter documentation for reading tables via JDBC
+   * in <a
+   * href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
+   * Data Source Option</a> in the version you use.
+   *
+   * @param table
+   *   Name of the table in the external database.
+   * @param columnName
+   *   Alias of `partitionColumn` option. Refer to `partitionColumn` in <a
+   *   href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
+   *   Data Source Option</a> in the version you use.
+   * @param connectionProperties
+   *   JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least
+   *   a "user" and "password" property should be included. "fetchsize" can be used to control the
+   *   number of rows per fetch and "queryTimeout" can be used to wait for a Statement object to
+   *   execute to the given number of seconds.
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def jdbc(
+      url: String,
+      table: String,
+      columnName: String,
+      lowerBound: Long,
+      upperBound: Long,
+      numPartitions: Int,
+      connectionProperties: Properties): DataFrame = {
+    // columnName, lowerBound, upperBound and numPartitions override settings in extraOptions.
+    this.extraOptions ++= Map(
+      "partitionColumn" -> columnName,
+      "lowerBound" -> lowerBound.toString,
+      "upperBound" -> upperBound.toString,
+      "numPartitions" -> numPartitions.toString)
+    jdbc(url, table, connectionProperties)
+  }
+
   /**
    * Loads a JSON file and returns the results as a `DataFrame`.
    *
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 6e9583ae725..e8921ca776d 100755
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql
 
 import java.nio.file.{Files, Path}
-import java.util.Collections
+import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable
@@ -235,6 +235,24 @@ class PlanGenerationTestSuite
       .load(testDataPath.resolve("people.csv").toString)
   }
 
+  test("read jdbc") {
+    session.read.jdbc(
+      "jdbc:h2:mem:testdb0;user=testUser;password=testPass",
+      "TEST.TIMETYPES",
+      new Properties())
+  }
+
+  test("read jdbc with partition") {
+    session.read.jdbc(
+      "jdbc:h2:mem:testdb0;user=testUser;password=testPass",
+      "TEST.EMP",
+      "THEID",
+      0,
+      4,
+      3,
+      new Properties())
+  }
+
   test("read json") {
     session.read.json(testDataPath.resolve("people.json").toString)
   }
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc.explain
new file mode 100644
index 00000000000..c0e906176b8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc.explain
@@ -0,0 +1 @@
+Relation [A#0,B#0,C#0] JDBCRelation(TEST.TIMETYPES) [numPartitions=1]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_partition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_partition.explain
new file mode 100644
index 00000000000..e3ddb781bd2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_partition.explain
@@ -0,0 +1 @@
+Relation [NAME#0,THEID#0,Dept#0] JDBCRelation(TEST.EMP) [numPartitions=3]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.json b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.json
new file mode 100644
index 00000000000..3e9b7b8cc86
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.json
@@ -0,0 +1,14 @@
+{
+  "common": {
+    "planId": "0"
+  },
+  "read": {
+    "dataSource": {
+      "format": "jdbc",
+      "options": {
+        "url": "jdbc:h2:mem:testdb0;user\u003dtestUser;password\u003dtestPass",
+        "dbtable": "TEST.TIMETYPES"
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.proto.bin
new file mode 100644
index 00000000000..4e74a07d22f
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.json b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.json
new file mode 100644
index 00000000000..31576cee4f1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.json
@@ -0,0 +1,18 @@
+{
+  "common": {
+    "planId": "0"
+  },
+  "read": {
+    "dataSource": {
+      "format": "jdbc",
+      "options": {
+        "url": "jdbc:h2:mem:testdb0;user\u003dtestUser;password\u003dtestPass",
+        "upperbound": "4",
+        "lowerbound": "0",
+        "numpartitions": "3",
+        "dbtable": "TEST.EMP",
+        "partitioncolumn": "THEID"
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.proto.bin
new file mode 100644
index 00000000000..c74178148de
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.proto.bin differ
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index f6e5442bd0c..142ae175090 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.connect
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
 import java.nio.file.attribute.BasicFileAttributes
+import java.sql.DriverManager
 import java.util
 
 import scala.util.{Failure, Success, Try}
@@ -36,6 +37,7 @@ import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.Utils
 
 // scalastyle:off
 /**
@@ -57,6 +59,37 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  */
 // scalastyle:on
 class ProtoToParsedPlanTestSuite extends SparkFunSuite with SharedSparkSession {
+  val url = "jdbc:h2:mem:testdb0"
+  var conn: java.sql.Connection = null
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    Utils.classForName("org.h2.Driver")
+    // Extra properties that will be specified for our database. We need these to test
+    // usage of parameters from OPTIONS clause in queries.
+    val properties = new util.Properties()
+    properties.setProperty("user", "testUser")
+    properties.setProperty("password", "testPass")
+
+    conn = DriverManager.getConnection(url, properties)
+    conn.prepareStatement("create schema test").executeUpdate()
+    conn
+      .prepareStatement("create table test.timetypes (a TIME, b DATE, c TIMESTAMP(7))")
+      .executeUpdate()
+    conn
+      .prepareStatement(
+        "create table test.emp(name TEXT(32) NOT NULL," +
+          " theid INTEGER, \"Dept\" INTEGER)")
+      .executeUpdate()
+    conn.commit()
+  }
+
+  override def afterAll(): Unit = {
+    conn.close()
+    super.afterAll()
+  }
+
   override def sparkConf: SparkConf = {
     super.sparkConf
       .set(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org