You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/07 20:04:07 UTC
spark git commit: [SPARK-21648][SQL] Fix confusing assert failure in
JDBC source when parallel fetching parameters are not properly provided.
Repository: spark
Updated Branches:
refs/heads/master cce25b360 -> baf5cac0f
[SPARK-21648][SQL] Fix confusing assert failure in JDBC source when parallel fetching parameters are not properly provided.
### What changes were proposed in this pull request?
```SQL
CREATE TABLE mytesttable1
USING org.apache.spark.sql.jdbc
OPTIONS (
url 'jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}&password=${jdbcPassword}',
dbtable 'mytesttable1',
paritionColumn 'state_id',
lowerBound '0',
upperBound '52',
numPartitions '53',
fetchSize '10000'
)
```
The above option name `paritionColumn` is wrong. That mean, users did not provide the value for `partitionColumn`. In such case, users hit a confusing error.
```
AssertionError: assertion failed
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:39)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:312)
```
### How was this patch tested?
Added a test case
Author: gatorsmile <ga...@gmail.com>
Closes #18864 from gatorsmile/jdbcPartCol.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/baf5cac0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/baf5cac0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/baf5cac0
Branch: refs/heads/master
Commit: baf5cac0f8c35925c366464d7e0eb5f6023fce57
Parents: cce25b3
Author: gatorsmile <ga...@gmail.com>
Authored: Mon Aug 7 13:04:04 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Aug 7 13:04:04 2017 -0700
----------------------------------------------------------------------
.../datasources/jdbc/JDBCOptions.scala | 11 ++++++----
.../datasources/jdbc/JdbcRelationProvider.scala | 9 ++++++--
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 22 ++++++++++++++++++++
.../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 5 +++--
4 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/baf5cac0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 591096d..96a8a51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -97,10 +97,13 @@ class JDBCOptions(
val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong)
// the upper bound of the partition column
val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong)
- require(partitionColumn.isEmpty ||
- (lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined),
- s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," +
- s" and '$JDBC_NUM_PARTITIONS' are required.")
+ // numPartitions is also used for data source writing
+ require((partitionColumn.isEmpty && lowerBound.isEmpty && upperBound.isEmpty) ||
+ (partitionColumn.isDefined && lowerBound.isDefined && upperBound.isDefined &&
+ numPartitions.isDefined),
+ s"When reading JDBC data sources, users need to specify all or none for the following " +
+ s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " +
+ s"and '$JDBC_NUM_PARTITIONS'")
val fetchSize = {
val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
require(size >= 0,
http://git-wip-us.apache.org/repos/asf/spark/blob/baf5cac0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index 74dcfb0..37e7bb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -29,6 +29,8 @@ class JdbcRelationProvider extends CreatableRelationProvider
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
+ import JDBCOptions._
+
val jdbcOptions = new JDBCOptions(parameters)
val partitionColumn = jdbcOptions.partitionColumn
val lowerBound = jdbcOptions.lowerBound
@@ -36,10 +38,13 @@ class JdbcRelationProvider extends CreatableRelationProvider
val numPartitions = jdbcOptions.numPartitions
val partitionInfo = if (partitionColumn.isEmpty) {
- assert(lowerBound.isEmpty && upperBound.isEmpty)
+ assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is not specified, " +
+ s"'$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty")
null
} else {
- assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty)
+ assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty,
+ s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', and " +
+ s"'$JDBC_NUM_PARTITIONS' are also required")
JDBCPartitioningInfo(
partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/baf5cac0/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 24f46a6..4c43646 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -421,6 +421,28 @@ class JDBCSuite extends SparkFunSuite
assert(e.contains("Invalid value `-1` for parameter `fetchsize`"))
}
+ test("Missing partition columns") {
+ withView("tempPeople") {
+ val e = intercept[IllegalArgumentException] {
+ sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW tempPeople
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (
+ | url 'jdbc:h2:mem:testdb0;user=testUser;password=testPass',
+ | dbtable 'TEST.PEOPLE',
+ | lowerBound '0',
+ | upperBound '52',
+ | numPartitions '53',
+ | fetchSize '10000' )
+ """.stripMargin.replaceAll("\n", " "))
+ }.getMessage
+ assert(e.contains("When reading JDBC data sources, users need to specify all or none " +
+ "for the following options: 'partitionColumn', 'lowerBound', 'upperBound', and " +
+ "'numPartitions'"))
+ }
+ }
+
test("Basic API with FetchSize") {
(0 to 4).foreach { size =>
val properties = new Properties()
http://git-wip-us.apache.org/repos/asf/spark/blob/baf5cac0/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 2334d5a..b7f97f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -324,8 +324,9 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
.option("partitionColumn", "foo")
.save()
}.getMessage
- assert(e.contains("If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," +
- " and 'numPartitions' are required."))
+ assert(e.contains("When reading JDBC data sources, users need to specify all or none " +
+ "for the following options: 'partitionColumn', 'lowerBound', 'upperBound', and " +
+ "'numPartitions'"))
}
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org