You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/01/12 05:03:58 UTC

spark git commit: [SPARK-16848][SQL] Check schema validation for user-specified schema in jdbc and table APIs

Repository: spark
Updated Branches:
  refs/heads/master 43fa21b3e -> 24100f162


[SPARK-16848][SQL] Check schema validation for user-specified schema in jdbc and table APIs

## What changes were proposed in this pull request?

This PR proposes to throw an exception for both jdbc APIs when user specified schemas are not allowed or useless.

**DataFrameReader.jdbc(...)**

``` scala
spark.read.schema(StructType(Nil)).jdbc(...)
```

**DataFrameReader.table(...)**

```scala
spark.read.schema(StructType(Nil)).table("usrdb.test")
```

## How was this patch tested?

Unit test in `JDBCSuite` and `DataFrameReaderWriterSuite`.

Author: hyukjinkwon <gu...@gmail.com>

Closes #14451 from HyukjinKwon/SPARK-16848.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24100f16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24100f16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24100f16

Branch: refs/heads/master
Commit: 24100f162dadb80400cb3e0bc94e4282f10f0c84
Parents: 43fa21b
Author: hyukjinkwon <gu...@gmail.com>
Authored: Wed Jan 11 21:03:48 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Jan 11 21:03:48 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameReader.scala     | 14 ++++++++++++--
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 17 ++++++++++++++++-
 .../sql/test/DataFrameReaderWriterSuite.scala      | 10 ++++++++++
 3 files changed, 38 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24100f16/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index cd83836..fe34d59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -165,6 +165,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * @since 1.4.0
    */
   def jdbc(url: String, table: String, properties: Properties): DataFrame = {
+    assertNoSpecifiedSchema("jdbc")
     // properties should override settings in extraOptions.
     this.extraOptions ++= properties.asScala
     // explicit url and dbtable should override all
@@ -235,6 +236,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
       table: String,
       predicates: Array[String],
       connectionProperties: Properties): DataFrame = {
+    assertNoSpecifiedSchema("jdbc")
     // connectionProperties should override settings in extraOptions.
     val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
     val options = new JDBCOptions(url, table, params)
@@ -475,6 +477,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * @since 1.4.0
    */
   def table(tableName: String): DataFrame = {
+    assertNoSpecifiedSchema("table")
     sparkSession.table(tableName)
   }
 
@@ -540,10 +543,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    */
   @scala.annotation.varargs
   def textFile(paths: String*): Dataset[String] = {
+    assertNoSpecifiedSchema("textFile")
+    text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
+  }
+
+  /**
+   * A convenient function for schema validation in APIs.
+   */
+  private def assertNoSpecifiedSchema(operation: String): Unit = {
     if (userSpecifiedSchema.nonEmpty) {
-      throw new AnalysisException("User specified schema not supported with `textFile`")
+      throw new AnalysisException(s"User specified schema not supported with `$operation`")
     }
-    text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
   }
 
   ///////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/24100f16/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 74ca66b..0396254 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -25,7 +25,7 @@ import org.h2.jdbc.JdbcSQLException
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.command.ExplainCommand
@@ -900,4 +900,19 @@ class JDBCSuite extends SparkFunSuite
     assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty)
     assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty)
   }
+
+  test("SPARK-16848: jdbc API throws an exception for user specified schema") {
+    val schema = StructType(Seq(
+      StructField("name", StringType, false), StructField("theid", IntegerType, false)))
+    val parts = Array[String]("THEID < 2", "THEID >= 2")
+    val e1 = intercept[AnalysisException] {
+      spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties())
+    }.getMessage
+    assert(e1.contains("User specified schema not supported with `jdbc`"))
+
+    val e2 = intercept[AnalysisException] {
+      spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
+    }.getMessage
+    assert(e2.contains("User specified schema not supported with `jdbc`"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/24100f16/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 4bec2e3..8a8ba05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -635,4 +635,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
       checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
     }
   }
+
+  test("SPARK-16848: table API throws an exception for user specified schema") {
+    withTable("t") {
+      val schema = StructType(StructField("a", StringType) :: Nil)
+      val e = intercept[AnalysisException] {
+        spark.read.schema(schema).table("t")
+      }.getMessage
+      assert(e.contains("User specified schema not supported with `table`"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org