You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/07/04 16:44:32 UTC

spark git commit: [SPARK-19726][SQL] Faild to insert null timestamp value to mysql using spark jdbc

Repository: spark
Updated Branches:
  refs/heads/master 29b1f6b09 -> a3c29fcbb


[SPARK-19726][SQL] Faild to insert null timestamp value to mysql using spark jdbc

## What changes were proposed in this pull request?

when creating table like following:
> create table timestamp_test(id int(11), time_stamp timestamp not null default current_timestamp);

The result of Excuting "insert into timestamp_test values (111, null)" is different between Spark and JDBC.
```
mysql> select * from timestamp_test;
+------+---------------------+
| id   | time_stamp          |
+------+---------------------+
|  111 | 1970-01-01 00:00:00 | -> spark
|  111 | 2017-06-27 19:32:38 | -> mysql
+------+---------------------+
2 rows in set (0.00 sec)
```
   Because in such case ```StructField.nullable``` is false, so the generated codes of ```InvokeLike``` and ```BoundReference``` don't check whether the field is null or not. Instead, they directly use ```CodegenContext.INPUT_ROW.getLong(1)```, however, ```UnsafeRow.setNullAt(1)``` will put 0 in the underlying memory.

   The PR will ```always``` set ```StructField.nullable```  true after obtaining metadata from jdbc connection, Since we can insert null to not null timestamp column in MySQL. In this way, spark will propagate null to underlying DB engine, and let DB to choose how to process NULL.

## How was this patch tested?

Added tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: YIHAODIAN\wangshuangshuang <wa...@yihaodian.com>
Author: Shuangshuang Wang <ws...@gmail.com>

Closes #18445 from shuangshuangwang/SPARK-19726.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3c29fcb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3c29fcb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3c29fcb

Branch: refs/heads/master
Commit: a3c29fcbbda02c1528b4185bcb880c91077d480c
Parents: 29b1f6b
Author: YIHAODIAN\wangshuangshuang <wa...@yihaodian.com>
Authored: Tue Jul 4 09:44:27 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jul 4 09:44:27 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala  |  2 +-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala      | 12 ++++++++++--
 .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala      |  8 ++++++++
 3 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a3c29fcb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 0f53b5c..57e9bc9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -59,7 +59,7 @@ object JDBCRDD extends Logging {
       try {
         val rs = statement.executeQuery()
         try {
-          JdbcUtils.getSchema(rs, dialect)
+          JdbcUtils.getSchema(rs, dialect, alwaysNullable = true)
         } finally {
           rs.close()
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/a3c29fcb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index ca61c2e..55b2539 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -266,10 +266,14 @@ object JdbcUtils extends Logging {
   /**
    * Takes a [[ResultSet]] and returns its Catalyst schema.
    *
+   * @param alwaysNullable If true, all the columns are nullable.
    * @return A [[StructType]] giving the Catalyst schema.
    * @throws SQLException if the schema contains an unsupported type.
    */
-  def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = {
+  def getSchema(
+      resultSet: ResultSet,
+      dialect: JdbcDialect,
+      alwaysNullable: Boolean = false): StructType = {
     val rsmd = resultSet.getMetaData
     val ncols = rsmd.getColumnCount
     val fields = new Array[StructField](ncols)
@@ -290,7 +294,11 @@ object JdbcUtils extends Logging {
             rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true
         }
       }
-      val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
+      val nullable = if (alwaysNullable) {
+        true
+      } else {
+        rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
+      }
       val metadata = new MetadataBuilder()
         .putString("name", columnName)
         .putLong("scale", fieldScale)

http://git-wip-us.apache.org/repos/asf/spark/blob/a3c29fcb/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index bf1fd16..92f50a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters.propertiesAsScalaMapConverter
 
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
@@ -506,4 +507,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
         "schema struct<name:string,id:int>"))
     }
   }
+
+  test("SPARK-19726: INSERT null to a NOT NULL column") {
+    val e = intercept[SparkException] {
+      sql("INSERT INTO PEOPLE1 values (null, null)")
+    }.getMessage
+    assert(e.contains("NULL not allowed for column \"NAME\""))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org