You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/04 02:59:59 UTC
[spark] branch branch-3.4 updated: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 0be6b2e6abb [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
0be6b2e6abb is described below
commit 0be6b2e6abb7ef9ce746be71db5f800fb49931a8
Author: Jiaan Geng <be...@163.com>
AuthorDate: Fri Mar 3 22:59:37 2023 -0400
[SPARK-42555][CONNECT] Add JDBC to DataFrameReader
### What changes were proposed in this pull request?
Currently, the connect project have the new `DataFrameReader` API which is corresponding to Spark `DataFrameReader` API. But the connect `DataFrameReader` missing the jdbc API.
### Why are the changes needed?
This PR try to add JDBC to `DataFrameReader`
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
New test cases.
Closes #40252 from beliefer/SPARK-42555.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Herman van Hovell <he...@databricks.com>
(cherry picked from commit 41c5e326eb4f45818c8227ab51e729c8402d995d)
Signed-off-by: Herman van Hovell <he...@databricks.com>
---
.../org/apache/spark/sql/DataFrameReader.scala | 66 +++++++++++++++++++++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 20 ++++++-
.../query-tests/explain-results/read_jdbc.explain | 1 +
.../read_jdbc_with_partition.explain | 1 +
.../resources/query-tests/queries/read_jdbc.json | 14 +++++
.../query-tests/queries/read_jdbc.proto.bin | Bin 0 -> 101 bytes
.../queries/read_jdbc_with_partition.json | 18 ++++++
.../queries/read_jdbc_with_partition.proto.bin | Bin 0 -> 177 bytes
.../sql/connect/ProtoToParsedPlanTestSuite.scala | 33 +++++++++++
9 files changed, 152 insertions(+), 1 deletion(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 3e17b03173b..43d6486f124 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import java.util.Properties
+
import scala.collection.JavaConverters._
import org.apache.spark.annotation.Stable
@@ -184,6 +186,70 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging
}
}
+ /**
+ * Construct a `DataFrame` representing the database table accessible via JDBC URL url named
+ * table and connection properties.
+ *
+ * You can find the JDBC-specific option and parameter documentation for reading tables via JDBC
+ * in <a
+ * href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ *
+ * @since 3.4.0
+ */
+ def jdbc(url: String, table: String, properties: Properties): DataFrame = {
+ // properties should override settings in extraOptions.
+ this.extraOptions ++= properties.asScala
+ // explicit url and dbtable should override all
+ this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
+ format("jdbc").load()
+ }
+
+ // scalastyle:off line.size.limit
+ /**
+ * Construct a `DataFrame` representing the database table accessible via JDBC URL url named
+ * table. Partitions of the table will be retrieved in parallel based on the parameters passed
+ * to this function.
+ *
+ * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
+ * your external database systems.
+ *
+ * You can find the JDBC-specific option and parameter documentation for reading tables via JDBC
+ * in <a
+ * href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ *
+ * @param table
+ * Name of the table in the external database.
+ * @param columnName
+ * Alias of `partitionColumn` option. Refer to `partitionColumn` in <a
+ * href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ * @param connectionProperties
+ * JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least
+ * a "user" and "password" property should be included. "fetchsize" can be used to control the
+ * number of rows per fetch and "queryTimeout" can be used to wait for a Statement object to
+ * execute to the given number of seconds.
+ * @since 3.4.0
+ */
+ // scalastyle:on line.size.limit
+ def jdbc(
+ url: String,
+ table: String,
+ columnName: String,
+ lowerBound: Long,
+ upperBound: Long,
+ numPartitions: Int,
+ connectionProperties: Properties): DataFrame = {
+ // columnName, lowerBound, upperBound and numPartitions override settings in extraOptions.
+ this.extraOptions ++= Map(
+ "partitionColumn" -> columnName,
+ "lowerBound" -> lowerBound.toString,
+ "upperBound" -> upperBound.toString,
+ "numPartitions" -> numPartitions.toString)
+ jdbc(url, table, connectionProperties)
+ }
+
/**
* Loads a JSON file and returns the results as a `DataFrame`.
*
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 6e9583ae725..e8921ca776d 100755
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
import java.nio.file.{Files, Path}
-import java.util.Collections
+import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
@@ -235,6 +235,24 @@ class PlanGenerationTestSuite
.load(testDataPath.resolve("people.csv").toString)
}
+ test("read jdbc") {
+ session.read.jdbc(
+ "jdbc:h2:mem:testdb0;user=testUser;password=testPass",
+ "TEST.TIMETYPES",
+ new Properties())
+ }
+
+ test("read jdbc with partition") {
+ session.read.jdbc(
+ "jdbc:h2:mem:testdb0;user=testUser;password=testPass",
+ "TEST.EMP",
+ "THEID",
+ 0,
+ 4,
+ 3,
+ new Properties())
+ }
+
test("read json") {
session.read.json(testDataPath.resolve("people.json").toString)
}
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc.explain
new file mode 100644
index 00000000000..c0e906176b8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc.explain
@@ -0,0 +1 @@
+Relation [A#0,B#0,C#0] JDBCRelation(TEST.TIMETYPES) [numPartitions=1]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_partition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_partition.explain
new file mode 100644
index 00000000000..e3ddb781bd2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_partition.explain
@@ -0,0 +1 @@
+Relation [NAME#0,THEID#0,Dept#0] JDBCRelation(TEST.EMP) [numPartitions=3]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.json b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.json
new file mode 100644
index 00000000000..3e9b7b8cc86
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.json
@@ -0,0 +1,14 @@
+{
+ "common": {
+ "planId": "0"
+ },
+ "read": {
+ "dataSource": {
+ "format": "jdbc",
+ "options": {
+ "url": "jdbc:h2:mem:testdb0;user\u003dtestUser;password\u003dtestPass",
+ "dbtable": "TEST.TIMETYPES"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.proto.bin
new file mode 100644
index 00000000000..4e74a07d22f
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.json b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.json
new file mode 100644
index 00000000000..31576cee4f1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.json
@@ -0,0 +1,18 @@
+{
+ "common": {
+ "planId": "0"
+ },
+ "read": {
+ "dataSource": {
+ "format": "jdbc",
+ "options": {
+ "url": "jdbc:h2:mem:testdb0;user\u003dtestUser;password\u003dtestPass",
+ "upperbound": "4",
+ "lowerbound": "0",
+ "numpartitions": "3",
+ "dbtable": "TEST.EMP",
+ "partitioncolumn": "THEID"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.proto.bin
new file mode 100644
index 00000000000..c74178148de
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.proto.bin differ
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index f6e5442bd0c..142ae175090 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.connect
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
import java.nio.file.attribute.BasicFileAttributes
+import java.sql.DriverManager
import java.util
import scala.util.{Failure, Success, Try}
@@ -36,6 +37,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.Utils
// scalastyle:off
/**
@@ -57,6 +59,37 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
*/
// scalastyle:on
class ProtoToParsedPlanTestSuite extends SparkFunSuite with SharedSparkSession {
+ val url = "jdbc:h2:mem:testdb0"
+ var conn: java.sql.Connection = null
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ Utils.classForName("org.h2.Driver")
+ // Extra properties that will be specified for our database. We need these to test
+ // usage of parameters from OPTIONS clause in queries.
+ val properties = new util.Properties()
+ properties.setProperty("user", "testUser")
+ properties.setProperty("password", "testPass")
+
+ conn = DriverManager.getConnection(url, properties)
+ conn.prepareStatement("create schema test").executeUpdate()
+ conn
+ .prepareStatement("create table test.timetypes (a TIME, b DATE, c TIMESTAMP(7))")
+ .executeUpdate()
+ conn
+ .prepareStatement(
+ "create table test.emp(name TEXT(32) NOT NULL," +
+ " theid INTEGER, \"Dept\" INTEGER)")
+ .executeUpdate()
+ conn.commit()
+ }
+
+ override def afterAll(): Unit = {
+ conn.close()
+ super.afterAll()
+ }
+
override def sparkConf: SparkConf = {
super.sparkConf
.set(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org