You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/12/02 03:15:38 UTC
spark git commit: [SPARK-18538][SQL][BACKPORT-2.1] Fix Concurrent
Table Fetching Using DataFrameReader JDBC APIs
Repository: spark
Updated Branches:
refs/heads/branch-2.1 2f91b0154 -> b9eb10043
[SPARK-18538][SQL][BACKPORT-2.1] Fix Concurrent Table Fetching Using DataFrameReader JDBC APIs
### What changes were proposed in this pull request?
#### This PR is to backport https://github.com/apache/spark/pull/15975 to Branch 2.1
---
The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree.
```Scala
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
```
```Scala
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
```
This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node.
Before the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
After the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
### How was this patch tested?
Added the verification logics on all the test cases for JDBC concurrent fetching.
Author: gatorsmile <ga...@gmail.com>
Closes #16111 from gatorsmile/jdbcFix2.1.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9eb1004
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9eb1004
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9eb1004
Branch: refs/heads/branch-2.1
Commit: b9eb10043129defa53c5bdfd1190fe68c0107b3b
Parents: 2f91b01
Author: gatorsmile <ga...@gmail.com>
Authored: Fri Dec 2 11:15:26 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Dec 2 11:15:26 2016 +0800
----------------------------------------------------------------------
.../org/apache/spark/sql/DataFrameReader.scala | 37 +++++------
.../datasources/jdbc/JDBCRelation.scala | 3 +-
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 67 ++++++++++++++------
3 files changed, 69 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b9eb1004/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1af2f9a..365b50d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -159,7 +159,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
- jdbc(url, table, JDBCRelation.columnPartition(null), properties)
+ // properties should override settings in extraOptions.
+ this.extraOptions = this.extraOptions ++ properties.asScala
+ // explicit url and dbtable should override all
+ this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
+ format("jdbc").load()
}
/**
@@ -177,7 +181,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @param upperBound the maximum value of `columnName` used to decide partition stride.
* @param numPartitions the number of partitions. This, along with `lowerBound` (inclusive),
* `upperBound` (exclusive), form partition strides for generated WHERE
- * clause expressions used to split the column `columnName` evenly.
+ * clause expressions used to split the column `columnName` evenly. When
+ * the input is less than 1, the number is set to 1.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included. "fetchsize" can be used to control the
@@ -192,9 +197,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame = {
- val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
- val parts = JDBCRelation.columnPartition(partitioning)
- jdbc(url, table, parts, connectionProperties)
+ // columnName, lowerBound, upperBound and numPartitions override settings in extraOptions.
+ this.extraOptions ++= Map(
+ JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
+ JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString,
+ JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString,
+ JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString)
+ jdbc(url, table, connectionProperties)
}
/**
@@ -220,22 +229,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame = {
+ // connectionProperties should override settings in extraOptions.
+ val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
+ val options = new JDBCOptions(url, table, params)
val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
}
- jdbc(url, table, parts, connectionProperties)
- }
-
- private def jdbc(
- url: String,
- table: String,
- parts: Array[Partition],
- connectionProperties: Properties): DataFrame = {
- // connectionProperties should override settings in extraOptions.
- this.extraOptions = this.extraOptions ++ connectionProperties.asScala
- // explicit url and dbtable should override all
- this.extraOptions += ("url" -> url, "dbtable" -> table)
- format("jdbc").load()
+ val relation = JDBCRelation(parts, options)(sparkSession)
+ sparkSession.baseRelationToDataFrame(relation)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/b9eb1004/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 6abb27d..5ca1c75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -138,7 +138,8 @@ private[sql] case class JDBCRelation(
}
override def toString: String = {
+ val partitioningInfo = if (parts.nonEmpty) s" [numPartitions=${parts.length}]" else ""
// credentials should not be included in the plan output, table information is sufficient.
- s"JDBCRelation(${jdbcOptions.table})"
+ s"JDBCRelation(${jdbcOptions.table})" + partitioningInfo
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b9eb1004/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index af5f01c..aa1ab14 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -24,12 +24,12 @@ import java.util.{Calendar, GregorianCalendar, Properties}
import org.h2.jdbc.JdbcSQLException
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRDD, JdbcUtils}
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRDD, JDBCRelation, JdbcUtils}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -224,6 +224,16 @@ class JDBCSuite extends SparkFunSuite
conn.close()
}
+ // Check whether the tables are fetched in the expected degree of parallelism
+ def checkNumPartitions(df: DataFrame, expectedNumPartitions: Int): Unit = {
+ val jdbcRelations = df.queryExecution.analyzed.collect {
+ case LogicalRelation(r: JDBCRelation, _, _) => r
+ }
+ assert(jdbcRelations.length == 1)
+ assert(jdbcRelations.head.parts.length == expectedNumPartitions,
+ s"Expecting a JDBCRelation with $expectedNumPartitions partitions, but got:`$jdbcRelations`")
+ }
+
test("SELECT *") {
assert(sql("SELECT * FROM foobar").collect().size === 3)
}
@@ -328,13 +338,23 @@ class JDBCSuite extends SparkFunSuite
}
test("SELECT * partitioned") {
- assert(sql("SELECT * FROM parts").collect().size == 3)
+ val df = sql("SELECT * FROM parts")
+ checkNumPartitions(df, expectedNumPartitions = 3)
+ assert(df.collect().length == 3)
}
test("SELECT WHERE (simple predicates) partitioned") {
- assert(sql("SELECT * FROM parts WHERE THEID < 1").collect().size === 0)
- assert(sql("SELECT * FROM parts WHERE THEID != 2").collect().size === 2)
- assert(sql("SELECT THEID FROM parts WHERE THEID = 1").collect().size === 1)
+ val df1 = sql("SELECT * FROM parts WHERE THEID < 1")
+ checkNumPartitions(df1, expectedNumPartitions = 3)
+ assert(df1.collect().length === 0)
+
+ val df2 = sql("SELECT * FROM parts WHERE THEID != 2")
+ checkNumPartitions(df2, expectedNumPartitions = 3)
+ assert(df2.collect().length === 2)
+
+ val df3 = sql("SELECT THEID FROM parts WHERE THEID = 1")
+ checkNumPartitions(df3, expectedNumPartitions = 3)
+ assert(df3.collect().length === 1)
}
test("SELECT second field partitioned") {
@@ -385,24 +405,27 @@ class JDBCSuite extends SparkFunSuite
}
test("Partitioning via JDBCPartitioningInfo API") {
- assert(
- spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties())
- .collect().length === 3)
+ val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties())
+ checkNumPartitions(df, expectedNumPartitions = 3)
+ assert(df.collect().length === 3)
}
test("Partitioning via list-of-where-clauses API") {
val parts = Array[String]("THEID < 2", "THEID >= 2")
- assert(spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties())
- .collect().length === 3)
+ val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties())
+ checkNumPartitions(df, expectedNumPartitions = 2)
+ assert(df.collect().length === 3)
}
test("Partitioning on column that might have null values.") {
- assert(
- spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "theid", 0, 4, 3, new Properties())
- .collect().length === 4)
- assert(
- spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new Properties())
- .collect().length === 4)
+ val df = spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "theid", 0, 4, 3, new Properties())
+ checkNumPartitions(df, expectedNumPartitions = 3)
+ assert(df.collect().length === 4)
+
+ val df2 = spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new Properties())
+ checkNumPartitions(df2, expectedNumPartitions = 3)
+ assert(df2.collect().length === 4)
+
// partitioning on a nullable quoted column
assert(
spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", """"Dept"""", 0, 4, 3, new Properties())
@@ -419,6 +442,7 @@ class JDBCSuite extends SparkFunSuite
numPartitions = 0,
connectionProperties = new Properties()
)
+ checkNumPartitions(res, expectedNumPartitions = 1)
assert(res.count() === 8)
}
@@ -432,6 +456,7 @@ class JDBCSuite extends SparkFunSuite
numPartitions = 10,
connectionProperties = new Properties()
)
+ checkNumPartitions(res, expectedNumPartitions = 4)
assert(res.count() === 8)
}
@@ -445,6 +470,7 @@ class JDBCSuite extends SparkFunSuite
numPartitions = 4,
connectionProperties = new Properties()
)
+ checkNumPartitions(res, expectedNumPartitions = 1)
assert(res.count() === 8)
}
@@ -465,7 +491,9 @@ class JDBCSuite extends SparkFunSuite
}
test("SELECT * on partitioned table with a nullable partition column") {
- assert(sql("SELECT * FROM nullparts").collect().size == 4)
+ val df = sql("SELECT * FROM nullparts")
+ checkNumPartitions(df, expectedNumPartitions = 3)
+ assert(df.collect().length == 4)
}
test("H2 integral types") {
@@ -739,7 +767,8 @@ class JDBCSuite extends SparkFunSuite
}
// test the JdbcRelation toString output
df.queryExecution.analyzed.collect {
- case r: LogicalRelation => assert(r.relation.toString == "JDBCRelation(TEST.PEOPLE)")
+ case r: LogicalRelation =>
+ assert(r.relation.toString == "JDBCRelation(TEST.PEOPLE) [numPartitions=3]")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org