You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/07 20:04:07 UTC

spark git commit: [SPARK-21648][SQL] Fix confusing assert failure in JDBC source when parallel fetching parameters are not properly provided.

Repository: spark
Updated Branches:
  refs/heads/master cce25b360 -> baf5cac0f


[SPARK-21648][SQL] Fix confusing assert failure in JDBC source when parallel fetching parameters are not properly provided.

### What changes were proposed in this pull request?
```SQL
CREATE TABLE mytesttable1
USING org.apache.spark.sql.jdbc
  OPTIONS (
  url 'jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}&password=${jdbcPassword}',
  dbtable 'mytesttable1',
  paritionColumn 'state_id',
  lowerBound '0',
  upperBound '52',
  numPartitions '53',
  fetchSize '10000'
)
```

The above option name `paritionColumn` is wrong. That mean, users did not provide the value for `partitionColumn`. In such case, users hit a confusing error.

```
AssertionError: assertion failed
java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:156)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:39)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:312)
```

### How was this patch tested?
Added a test case

Author: gatorsmile <ga...@gmail.com>

Closes #18864 from gatorsmile/jdbcPartCol.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/baf5cac0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/baf5cac0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/baf5cac0

Branch: refs/heads/master
Commit: baf5cac0f8c35925c366464d7e0eb5f6023fce57
Parents: cce25b3
Author: gatorsmile <ga...@gmail.com>
Authored: Mon Aug 7 13:04:04 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Aug 7 13:04:04 2017 -0700

----------------------------------------------------------------------
 .../datasources/jdbc/JDBCOptions.scala          | 11 ++++++----
 .../datasources/jdbc/JdbcRelationProvider.scala |  9 ++++++--
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 22 ++++++++++++++++++++
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  |  5 +++--
 4 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/baf5cac0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 591096d..96a8a51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -97,10 +97,13 @@ class JDBCOptions(
   val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong)
   // the upper bound of the partition column
   val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong)
-  require(partitionColumn.isEmpty ||
-    (lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined),
-    s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," +
-      s" and '$JDBC_NUM_PARTITIONS' are required.")
+  // numPartitions is also used for data source writing
+  require((partitionColumn.isEmpty && lowerBound.isEmpty && upperBound.isEmpty) ||
+    (partitionColumn.isDefined && lowerBound.isDefined && upperBound.isDefined &&
+      numPartitions.isDefined),
+    s"When reading JDBC data sources, users need to specify all or none for the following " +
+      s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " +
+      s"and '$JDBC_NUM_PARTITIONS'")
   val fetchSize = {
     val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
     require(size >= 0,

http://git-wip-us.apache.org/repos/asf/spark/blob/baf5cac0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index 74dcfb0..37e7bb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -29,6 +29,8 @@ class JdbcRelationProvider extends CreatableRelationProvider
   override def createRelation(
       sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
+    import JDBCOptions._
+
     val jdbcOptions = new JDBCOptions(parameters)
     val partitionColumn = jdbcOptions.partitionColumn
     val lowerBound = jdbcOptions.lowerBound
@@ -36,10 +38,13 @@ class JdbcRelationProvider extends CreatableRelationProvider
     val numPartitions = jdbcOptions.numPartitions
 
     val partitionInfo = if (partitionColumn.isEmpty) {
-      assert(lowerBound.isEmpty && upperBound.isEmpty)
+      assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is not specified, " +
+        s"'$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty")
       null
     } else {
-      assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty)
+      assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty,
+        s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', and " +
+          s"'$JDBC_NUM_PARTITIONS' are also required")
       JDBCPartitioningInfo(
         partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/baf5cac0/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 24f46a6..4c43646 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -421,6 +421,28 @@ class JDBCSuite extends SparkFunSuite
     assert(e.contains("Invalid value `-1` for parameter `fetchsize`"))
   }
 
+  test("Missing partition columns") {
+    withView("tempPeople") {
+      val e = intercept[IllegalArgumentException] {
+        sql(
+          s"""
+             |CREATE OR REPLACE TEMPORARY VIEW tempPeople
+             |USING org.apache.spark.sql.jdbc
+             |OPTIONS (
+             |  url 'jdbc:h2:mem:testdb0;user=testUser;password=testPass',
+             |  dbtable 'TEST.PEOPLE',
+             |  lowerBound '0',
+             |  upperBound '52',
+             |  numPartitions '53',
+             |  fetchSize '10000' )
+           """.stripMargin.replaceAll("\n", " "))
+      }.getMessage
+      assert(e.contains("When reading JDBC data sources, users need to specify all or none " +
+        "for the following options: 'partitionColumn', 'lowerBound', 'upperBound', and " +
+        "'numPartitions'"))
+    }
+  }
+
   test("Basic API with FetchSize") {
     (0 to 4).foreach { size =>
       val properties = new Properties()

http://git-wip-us.apache.org/repos/asf/spark/blob/baf5cac0/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 2334d5a..b7f97f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -324,8 +324,9 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
         .option("partitionColumn", "foo")
         .save()
     }.getMessage
-    assert(e.contains("If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," +
-      " and 'numPartitions' are required."))
+    assert(e.contains("When reading JDBC data sources, users need to specify all or none " +
+      "for the following options: 'partitionColumn', 'lowerBound', 'upperBound', and " +
+      "'numPartitions'"))
   }
 
   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org