You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/23 16:55:52 UTC

spark git commit: [SPARK-22303][SQL] Handle Oracle specific jdbc types in OracleDialect

Repository: spark
Updated Branches:
  refs/heads/master 57accf6e3 -> 5a5b6b785


[SPARK-22303][SQL] Handle Oracle specific jdbc types in OracleDialect

TIMESTAMP (-101), BINARY_DOUBLE (101) and BINARY_FLOAT (100) are handled in OracleDialect

## What changes were proposed in this pull request?

When a oracle table contains columns whose type is BINARY_FLOAT or BINARY_DOUBLE, spark sql fails to load a table with SQLException

```
java.sql.SQLException: Unsupported type 101
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getCatalystType(JdbcUtils.scala:235)
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:292)
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:292)
 at scala.Option.getOrElse(Option.scala:121)
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:291)
 at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:64)
 at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:113)
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
 at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:306)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
```

## How was this patch tested?

I updated a UT which covers type conversion test for types (-101, 100, 101), on top of that I tested this change against actual table with those columns and it was able to read and write to the table.

Author: Kohki Nishio <ta...@me.com>

Closes #19548 from taroplus/oracle_sql_types_101.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a5b6b78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a5b6b78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a5b6b78

Branch: refs/heads/master
Commit: 5a5b6b78517b526771ee5b579d56aa1daa4b3ef1
Parents: 57accf6
Author: Kohki Nishio <ta...@me.com>
Authored: Mon Oct 23 09:55:46 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Oct 23 09:55:46 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/jdbc/OracleIntegrationSuite.scala | 43 ++++++++++++++++---
 .../execution/datasources/jdbc/JdbcUtils.scala  |  1 -
 .../apache/spark/sql/jdbc/OracleDialect.scala   | 44 +++++++++++---------
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  6 +++
 4 files changed, 68 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a5b6b78/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 7680ae3..9034318 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -21,7 +21,7 @@ import java.sql.{Connection, Date, Timestamp}
 import java.util.Properties
 import java.math.BigDecimal
 
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
 import org.apache.spark.sql.execution.{WholeStageCodegenExec, RowDataSourceScanExec}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -52,7 +52,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
   import testImplicits._
 
   override val db = new DatabaseOnDocker {
-    override val imageName = "wnameless/oracle-xe-11g:14.04.4"
+    override val imageName = "wnameless/oracle-xe-11g:16.04"
     override val env = Map(
       "ORACLE_ROOT_PASSWORD" -> "oracle"
     )
@@ -104,15 +104,18 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
       """.stripMargin.replaceAll("\n", " "))
 
 
-    conn.prepareStatement("CREATE TABLE numerics (b DECIMAL(1), f DECIMAL(3, 2), i DECIMAL(10))").executeUpdate();
+    conn.prepareStatement("CREATE TABLE numerics (b DECIMAL(1), f DECIMAL(3, 2), i DECIMAL(10))").executeUpdate()
     conn.prepareStatement(
-      "INSERT INTO numerics VALUES (4, 1.23, 9999999999)").executeUpdate();
-    conn.commit();
+      "INSERT INTO numerics VALUES (4, 1.23, 9999999999)").executeUpdate()
+    conn.commit()
+
+    conn.prepareStatement("CREATE TABLE oracle_types (d BINARY_DOUBLE, f BINARY_FLOAT)").executeUpdate()
+    conn.commit()
   }
 
 
   test("SPARK-16625 : Importing Oracle numeric types") {
-    val df = sqlContext.read.jdbc(jdbcUrl, "numerics", new Properties);
+    val df = sqlContext.read.jdbc(jdbcUrl, "numerics", new Properties)
     val rows = df.collect()
     assert(rows.size == 1)
     val row = rows(0)
@@ -307,4 +310,32 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
     assert(values.getInt(1).equals(1))
     assert(values.getBoolean(2).equals(false))
   }
+
+  test("SPARK-22303: handle BINARY_DOUBLE and BINARY_FLOAT as DoubleType and FloatType") {
+    val tableName = "oracle_types"
+    val schema = StructType(Seq(
+      StructField("d", DoubleType, true),
+      StructField("f", FloatType, true)))
+    val props = new Properties()
+
+    // write it back to the table (append mode)
+    val data = spark.sparkContext.parallelize(Seq(Row(1.1, 2.2f)))
+    val dfWrite = spark.createDataFrame(data, schema)
+    dfWrite.write.mode(SaveMode.Append).jdbc(jdbcUrl, tableName, props)
+
+    // read records from oracle_types
+    val dfRead = sqlContext.read.jdbc(jdbcUrl, tableName, new Properties)
+    val rows = dfRead.collect()
+    assert(rows.size == 1)
+
+    // check data types
+    val types = dfRead.schema.map(field => field.dataType)
+    assert(types(0).equals(DoubleType))
+    assert(types(1).equals(FloatType))
+
+    // check values
+    val values = rows(0)
+    assert(values.getDouble(0) === 1.1)
+    assert(values.getFloat(1) === 2.2f)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a5b6b78/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 7113366..9debc4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -230,7 +230,6 @@ object JdbcUtils extends Logging {
       case java.sql.Types.TIMESTAMP     => TimestampType
       case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
                                         => TimestampType
-      case -101                         => TimestampType // Value for Timestamp with Time Zone in Oracle
       case java.sql.Types.TINYINT       => IntegerType
       case java.sql.Types.VARBINARY     => BinaryType
       case java.sql.Types.VARCHAR       => StringType

http://git-wip-us.apache.org/repos/asf/spark/blob/5a5b6b78/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index 3b44c1d..e3f106c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -23,30 +23,36 @@ import org.apache.spark.sql.types._
 
 
 private case object OracleDialect extends JdbcDialect {
+  private[jdbc] val BINARY_FLOAT = 100
+  private[jdbc] val BINARY_DOUBLE = 101
+  private[jdbc] val TIMESTAMPTZ = -101
 
   override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
 
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
-    if (sqlType == Types.NUMERIC) {
-      val scale = if (null != md) md.build().getLong("scale") else 0L
-      size match {
-        // Handle NUMBER fields that have no precision/scale in special way
-        // because JDBC ResultSetMetaData converts this to 0 precision and -127 scale
-        // For more details, please see
-        // https://github.com/apache/spark/pull/8780#issuecomment-145598968
-        // and
-        // https://github.com/apache/spark/pull/8780#issuecomment-144541760
-        case 0 => Option(DecimalType(DecimalType.MAX_PRECISION, 10))
-        // Handle FLOAT fields in a special way because JDBC ResultSetMetaData converts
-        // this to NUMERIC with -127 scale
-        // Not sure if there is a more robust way to identify the field as a float (or other
-        // numeric types that do not specify a scale.
-        case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10))
-        case _ => None
-      }
-    } else {
-      None
+    sqlType match {
+      case Types.NUMERIC =>
+        val scale = if (null != md) md.build().getLong("scale") else 0L
+        size match {
+          // Handle NUMBER fields that have no precision/scale in special way
+          // because JDBC ResultSetMetaData converts this to 0 precision and -127 scale
+          // For more details, please see
+          // https://github.com/apache/spark/pull/8780#issuecomment-145598968
+          // and
+          // https://github.com/apache/spark/pull/8780#issuecomment-144541760
+          case 0 => Option(DecimalType(DecimalType.MAX_PRECISION, 10))
+          // Handle FLOAT fields in a special way because JDBC ResultSetMetaData converts
+          // this to NUMERIC with -127 scale
+          // Not sure if there is a more robust way to identify the field as a float (or other
+          // numeric types that do not specify a scale.
+          case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10))
+          case _ => None
+        }
+      case TIMESTAMPTZ => Some(TimestampType) // Value for Timestamp with Time Zone in Oracle
+      case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT
+      case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE
+      case _ => None
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a5b6b78/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 34205e0..167b3e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -815,6 +815,12 @@ class JDBCSuite extends SparkFunSuite
       Some(DecimalType(DecimalType.MAX_PRECISION, 10)))
     assert(oracleDialect.getCatalystType(java.sql.Types.NUMERIC, "numeric", 0, null) ==
       Some(DecimalType(DecimalType.MAX_PRECISION, 10)))
+    assert(oracleDialect.getCatalystType(OracleDialect.BINARY_FLOAT, "BINARY_FLOAT", 0, null) ==
+      Some(FloatType))
+    assert(oracleDialect.getCatalystType(OracleDialect.BINARY_DOUBLE, "BINARY_DOUBLE", 0, null) ==
+      Some(DoubleType))
+    assert(oracleDialect.getCatalystType(OracleDialect.TIMESTAMPTZ, "TIMESTAMP", 0, null) ==
+      Some(TimestampType))
   }
 
   test("table exists query by jdbc dialect") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org