You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/05/06 04:53:28 UTC

[spark] branch branch-2.4 updated: [SPARK-27596][SQL] The JDBC 'query' option doesn't work for Oracle database

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 771da83  [SPARK-27596][SQL] The JDBC 'query' option doesn't work for Oracle database
771da83 is described below

commit 771da83c34247cb1f75ee13b939ee51baa3a11bb
Author: Dilip Biswal <db...@us.ibm.com>
AuthorDate: Sun May 5 21:52:23 2019 -0700

    [SPARK-27596][SQL] The JDBC 'query' option doesn't work for Oracle database
    
    ## What changes were proposed in this pull request?
    **Description from JIRA**
    For the JDBC option `query`, we use the identifier name to start with underscore: s"(${subquery}) _SPARK_GEN_JDBC_SUBQUERY_NAME${curId.getAndIncrement()}". This is not supported by Oracle.
    The Oracle doesn't seem to support identifier name to start with non-alphabet character (unless it is quoted) and has length restrictions as well. [link](https://docs.oracle.com/cd/B19306_01/server.102/b14200/sql_elements008.htm)
    
    In this PR, the generated alias name 'SPARK_GEN_JDBC_SUBQUERY_NAME<int value>' is fixed to remove "_" prefix and also the alias name is shortened to not exceed the identifier length limit.
    
    ## How was this patch tested?
    Tests are added for MySql, Postgress, Oracle and DB2 to ensure enough coverage.
    
    Closes #24532 from dilipbiswal/SPARK-27596.
    
    Authored-by: Dilip Biswal <db...@us.ibm.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
    (cherry picked from commit 6001d476ce663ee6a458535431d30e8213181fcf)
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../spark/sql/jdbc/DB2IntegrationSuite.scala       | 26 ++++++++++++++++++++
 .../spark/sql/jdbc/MySQLIntegrationSuite.scala     | 27 +++++++++++++++++++++
 .../spark/sql/jdbc/OracleIntegrationSuite.scala    | 28 ++++++++++++++++++++++
 .../spark/sql/jdbc/PostgresIntegrationSuite.scala  | 26 ++++++++++++++++++++
 .../execution/datasources/jdbc/JDBCOptions.scala   |  2 +-
 5 files changed, 108 insertions(+), 1 deletion(-)

diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
index f5930bc28..32e56f0 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
@@ -158,4 +158,30 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
     assert(rows(0).getInt(1) == 20)
     assert(rows(0).getString(2) == "1")
   }
+
+  test("query JDBC option") {
+    val expectedResult = Set(
+      (42, "fred"),
+      (17, "dave")
+    ).map { case (x, y) =>
+      Row(Integer.valueOf(x), String.valueOf(y))
+    }
+
+    val query = "SELECT x, y FROM tbl WHERE x > 10"
+    // query option to pass on the query string.
+    val df = spark.read.format("jdbc")
+      .option("url", jdbcUrl)
+      .option("query", query)
+      .load()
+    assert(df.collect.toSet === expectedResult)
+
+    // query option in the create table path.
+    sql(
+      s"""
+         |CREATE OR REPLACE TEMPORARY VIEW queryOption
+         |USING org.apache.spark.sql.jdbc
+         |OPTIONS (url '$jdbcUrl', query '$query')
+       """.stripMargin.replaceAll("\n", " "))
+    assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
+  }
 }
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index a70ed98..9cd5c4e 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -21,6 +21,7 @@ import java.math.BigDecimal
 import java.sql.{Connection, Date, Timestamp}
 import java.util.Properties
 
+import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.tags.DockerTest
 
 @DockerTest
@@ -152,4 +153,30 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
     df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
     df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
   }
+
+  test("query JDBC option") {
+    val expectedResult = Set(
+      (42, "fred"),
+      (17, "dave")
+    ).map { case (x, y) =>
+      Row(Integer.valueOf(x), String.valueOf(y))
+    }
+
+    val query = "SELECT x, y FROM tbl WHERE x > 10"
+    // query option to pass on the query string.
+    val df = spark.read.format("jdbc")
+      .option("url", jdbcUrl)
+      .option("query", query)
+      .load()
+    assert(df.collect.toSet === expectedResult)
+
+    // query option in the create table path.
+    sql(
+      s"""
+         |CREATE OR REPLACE TEMPORARY VIEW queryOption
+         |USING org.apache.spark.sql.jdbc
+         |OPTIONS (url '$jdbcUrl', query '$query')
+       """.stripMargin.replaceAll("\n", " "))
+    assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
+  }
 }
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 70d294d..540de0f 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -482,4 +482,32 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
     }
     assert(df2.collect.toSet === expectedResult)
   }
+
+  test("query JDBC option") {
+    val expectedResult = Set(
+      (1, "1991-11-09", "1996-01-01 01:23:45")
+    ).map { case (id, date, timestamp) =>
+      Row(BigDecimal.valueOf(id), Date.valueOf(date), Timestamp.valueOf(timestamp))
+    }
+
+    val query = "SELECT id, d, t FROM datetime WHERE id = 1"
+    // query option to pass on the query string.
+    val df = spark.read.format("jdbc")
+      .option("url", jdbcUrl)
+      .option("query", query)
+      .option("oracle.jdbc.mapDateToTimestamp", "false")
+      .load()
+    assert(df.collect.toSet === expectedResult)
+
+    // query option in the create table path.
+    sql(
+      s"""
+         |CREATE OR REPLACE TEMPORARY VIEW queryOption
+         |USING org.apache.spark.sql.jdbc
+         |OPTIONS (url '$jdbcUrl',
+         |   query '$query',
+         |   oracle.jdbc.mapDateToTimestamp false)
+       """.stripMargin.replaceAll("\n", " "))
+    assert(sql("select id, d, t from queryOption").collect.toSet == expectedResult)
+  }
 }
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index e8d5b46..7caf3d6 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -21,6 +21,7 @@ import java.sql.Connection
 import java.util.Properties
 
 import org.apache.spark.sql.Column
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType, ShortType}
 import org.apache.spark.tags.DockerTest
@@ -180,4 +181,29 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
     assert(rows(0).getSeq(8) == Seq("""{"a": "foo", "b": "bar"}""", """{"a": 1, "b": 2}"""))
     assert(rows(0).getSeq(9) == Seq("""{"a": 1, "b": 2, "c": 3}"""))
   }
+
+  test("query JDBC option") {
+    val expectedResult = Set(
+      (42, 123456789012345L)
+    ).map { case (c1, c3) =>
+      Row(Integer.valueOf(c1), java.lang.Long.valueOf(c3))
+    }
+
+    val query = "SELECT c1, c3 FROM bar WHERE c1 IS NOT NULL"
+    // query option to pass on the query string.
+    val df = spark.read.format("jdbc")
+      .option("url", jdbcUrl)
+      .option("query", query)
+      .load()
+    assert(df.collect.toSet === expectedResult)
+
+    // query option in the create table path.
+    sql(
+      s"""
+         |CREATE OR REPLACE TEMPORARY VIEW queryOption
+         |USING org.apache.spark.sql.jdbc
+         |OPTIONS (url '$jdbcUrl', query '$query')
+       """.stripMargin.replaceAll("\n", " "))
+    assert(sql("select c1, c3 from queryOption").collect.toSet == expectedResult)
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index b4469cb..d184f3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -87,7 +87,7 @@ class JDBCOptions(
       if (subquery.isEmpty) {
         throw new IllegalArgumentException(s"Option `$JDBC_QUERY_STRING` can not be empty.")
       } else {
-        s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}"
+        s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
       }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org