You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sa...@apache.org on 2021/02/20 14:46:23 UTC

[spark] branch master updated: [SPARK-34379][SQL] Map JDBC RowID to StringType rather than LongType

This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 82b33a3  [SPARK-34379][SQL] Map JDBC RowID to StringType rather than LongType
82b33a3 is described below

commit 82b33a304160e4f950de613c3d17f88fa3e75e5e
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Sat Feb 20 23:45:56 2021 +0900

    [SPARK-34379][SQL] Map JDBC RowID to StringType rather than LongType
    
    ### What changes were proposed in this pull request?
    
    This PR fix an issue that `java.sql.RowId` is mapped to `LongType` and prefer `StringType`.
    
    In the current implementation, JDBC RowID type is mapped to `LongType` except for `OracleDialect`, but there is no guarantee to be able to convert RowID to long.
    `java.sql.RowId` declares `toString` and the specification of `java.sql.RowId` says
    
    > _all methods on the RowId interface must be fully implemented if the JDBC driver supports the data type_
    (https://docs.oracle.com/javase/8/docs/api/java/sql/RowId.html)
    
    So, we should prefer StringType to LongType.
    
    ### Why are the changes needed?
    
    This seems to be a potential bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. RowID is mapped to StringType rather than LongType.
    
    ### How was this patch tested?
    
    New test and  the existing test case `SPARK-32992: map Oracle's ROWID type to StringType` in `OracleIntegrationSuite` passes.
    
    Closes #31491 from sarutak/rowid-type.
    
    Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
    Signed-off-by: Kousuke Saruta <sa...@oss.nttdata.com>
---
 docs/sql-migration-guide.md                        |  2 ++
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 20 ++++++++++++-----
 .../org/apache/spark/sql/jdbc/OracleDialect.scala  |  6 -----
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 26 ++++++++++++++++++++++
 4 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index a6101c6..0552e80 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -24,6 +24,8 @@ license: |
 
 ## Upgrading from Spark SQL 3.1 to 3.2
 
+  - Since Spark 3.2, all the supported JDBC dialects use StringType for ROWID. In Spark 3.1 or earlier, Oracle dialect uses StringType and the other dialects use LongType.
+
   - In Spark 3.2, PostgreSQL JDBC dialect uses StringType for MONEY and MONEY[] is not supported due to the JDBC driver for PostgreSQL can't handle those types properly. In Spark 3.1 or earlier, DoubleType and ArrayType of DoubleType are used respectively.
 
   - In Spark 3.2, `spark.sql.adaptive.enabled` is enabled by default. To restore the behavior before Spark 3.2, you can set `spark.sql.adaptive.enabled` to `false`.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 2ba8bed..76099aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -226,7 +226,7 @@ object JdbcUtils extends Logging {
       case java.sql.Types.REAL          => DoubleType
       case java.sql.Types.REF           => StringType
       case java.sql.Types.REF_CURSOR    => null
-      case java.sql.Types.ROWID         => LongType
+      case java.sql.Types.ROWID         => StringType
       case java.sql.Types.SMALLINT      => IntegerType
       case java.sql.Types.SQLXML        => StringType
       case java.sql.Types.STRUCT        => StringType
@@ -310,11 +310,15 @@ object JdbcUtils extends Logging {
       val metadata = new MetadataBuilder()
       metadata.putLong("scale", fieldScale)
 
-      // SPARK-33888
-      // - include TIME type metadata
-      // - always build the metadata
-      if (dataType == java.sql.Types.TIME) {
-        metadata.putBoolean("logical_time_type", true)
+      dataType match {
+        case java.sql.Types.TIME =>
+          // SPARK-33888
+          // - include TIME type metadata
+          // - always build the metadata
+          metadata.putBoolean("logical_time_type", true)
+        case java.sql.Types.ROWID =>
+          metadata.putBoolean("rowid", true)
+        case _ =>
       }
 
       val columnType =
@@ -448,6 +452,10 @@ object JdbcUtils extends Logging {
       (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setByte(pos, rs.getByte(pos + 1))
 
+    case StringType if metadata.contains("rowid") =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.update(pos, UTF8String.fromString(rs.getRowId(pos + 1).toString))
+
     case StringType =>
       (rs: ResultSet, row: InternalRow, pos: Int) =>
         // TODO(davies): use getBytes for better performance, if the encoding is UTF-8
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index 491b6e2..b741ece 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -64,12 +64,6 @@ private case object OracleDialect extends JdbcDialect {
         => Some(TimestampType) // Value for Timestamp with Time Zone in Oracle
       case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT
       case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE
-      // scalastyle:off line.size.limit
-      // According to the documentation for Oracle Database 19c:
-      // "Values of the ROWID pseudocolumn are strings representing the address of each row."
-      // https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/Data-Types.html#GUID-AEF1FE4C-2DE5-4BE7-BB53-83AD8F1E34EF
-      // scalastyle:on line.size.limit
-      case Types.ROWID => Some(StringType)
       case _ => None
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index cc27211..ff9adc9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -25,6 +25,8 @@ import java.util.{Calendar, GregorianCalendar, Properties}
 import scala.collection.JavaConverters._
 
 import org.h2.jdbc.JdbcSQLException
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.SparkException
@@ -1781,4 +1783,28 @@ class JDBCSuite extends QueryTest
     assert(options.asProperties.get("url") == url)
     assert(options.asProperties.get("dbtable") == "table3")
   }
+
+  test("SPARK-34379: Map JDBC RowID to StringType rather than LongType") {
+    val mockRsmd = mock(classOf[java.sql.ResultSetMetaData])
+    when(mockRsmd.getColumnCount).thenReturn(1)
+    when(mockRsmd.getColumnLabel(anyInt())).thenReturn("rowid")
+    when(mockRsmd.getColumnType(anyInt())).thenReturn(java.sql.Types.ROWID)
+    when(mockRsmd.getColumnTypeName(anyInt())).thenReturn("rowid")
+    when(mockRsmd.getPrecision(anyInt())).thenReturn(0)
+    when(mockRsmd.getScale(anyInt())).thenReturn(0)
+    when(mockRsmd.isSigned(anyInt())).thenReturn(false)
+    when(mockRsmd.isNullable(anyInt())).thenReturn(java.sql.ResultSetMetaData.columnNoNulls)
+
+    val mockRs = mock(classOf[java.sql.ResultSet])
+    when(mockRs.getMetaData).thenReturn(mockRsmd)
+
+    val mockDialect = mock(classOf[JdbcDialect])
+    when(mockDialect.getCatalystType(anyInt(), anyString(), anyInt(), any[MetadataBuilder]))
+      .thenReturn(None)
+
+    val schema = JdbcUtils.getSchema(mockRs, mockDialect)
+    val fields = schema.fields
+    assert(fields.length === 1)
+    assert(fields(0).dataType === StringType)
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org