You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/05/06 04:53:28 UTC
[spark] branch branch-2.4 updated: [SPARK-27596][SQL] The JDBC
'query' option doesn't work for Oracle database
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 771da83 [SPARK-27596][SQL] The JDBC 'query' option doesn't work for Oracle database
771da83 is described below
commit 771da83c34247cb1f75ee13b939ee51baa3a11bb
Author: Dilip Biswal <db...@us.ibm.com>
AuthorDate: Sun May 5 21:52:23 2019 -0700
[SPARK-27596][SQL] The JDBC 'query' option doesn't work for Oracle database
## What changes were proposed in this pull request?
**Description from JIRA**
For the JDBC option `query`, we use the identifier name to start with underscore: s"(${subquery}) _SPARK_GEN_JDBC_SUBQUERY_NAME${curId.getAndIncrement()}". This is not supported by Oracle.
The Oracle doesn't seem to support identifier name to start with non-alphabet character (unless it is quoted) and has length restrictions as well. [link](https://docs.oracle.com/cd/B19306_01/server.102/b14200/sql_elements008.htm)
In this PR, the generated alias name 'SPARK_GEN_JDBC_SUBQUERY_NAME<int value>' is fixed to remove "_" prefix and also the alias name is shortened to not exceed the identifier length limit.
## How was this patch tested?
Tests are added for MySql, Postgress, Oracle and DB2 to ensure enough coverage.
Closes #24532 from dilipbiswal/SPARK-27596.
Authored-by: Dilip Biswal <db...@us.ibm.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
(cherry picked from commit 6001d476ce663ee6a458535431d30e8213181fcf)
Signed-off-by: gatorsmile <ga...@gmail.com>
---
.../spark/sql/jdbc/DB2IntegrationSuite.scala | 26 ++++++++++++++++++++
.../spark/sql/jdbc/MySQLIntegrationSuite.scala | 27 +++++++++++++++++++++
.../spark/sql/jdbc/OracleIntegrationSuite.scala | 28 ++++++++++++++++++++++
.../spark/sql/jdbc/PostgresIntegrationSuite.scala | 26 ++++++++++++++++++++
.../execution/datasources/jdbc/JDBCOptions.scala | 2 +-
5 files changed, 108 insertions(+), 1 deletion(-)
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
index f5930bc28..32e56f0 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
@@ -158,4 +158,30 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
assert(rows(0).getInt(1) == 20)
assert(rows(0).getString(2) == "1")
}
+
+ test("query JDBC option") {
+ val expectedResult = Set(
+ (42, "fred"),
+ (17, "dave")
+ ).map { case (x, y) =>
+ Row(Integer.valueOf(x), String.valueOf(y))
+ }
+
+ val query = "SELECT x, y FROM tbl WHERE x > 10"
+ // query option to pass on the query string.
+ val df = spark.read.format("jdbc")
+ .option("url", jdbcUrl)
+ .option("query", query)
+ .load()
+ assert(df.collect.toSet === expectedResult)
+
+ // query option in the create table path.
+ sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW queryOption
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$jdbcUrl', query '$query')
+ """.stripMargin.replaceAll("\n", " "))
+ assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
+ }
}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index a70ed98..9cd5c4e 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -21,6 +21,7 @@ import java.math.BigDecimal
import java.sql.{Connection, Date, Timestamp}
import java.util.Properties
+import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.tags.DockerTest
@DockerTest
@@ -152,4 +153,30 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
}
+
+ test("query JDBC option") {
+ val expectedResult = Set(
+ (42, "fred"),
+ (17, "dave")
+ ).map { case (x, y) =>
+ Row(Integer.valueOf(x), String.valueOf(y))
+ }
+
+ val query = "SELECT x, y FROM tbl WHERE x > 10"
+ // query option to pass on the query string.
+ val df = spark.read.format("jdbc")
+ .option("url", jdbcUrl)
+ .option("query", query)
+ .load()
+ assert(df.collect.toSet === expectedResult)
+
+ // query option in the create table path.
+ sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW queryOption
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$jdbcUrl', query '$query')
+ """.stripMargin.replaceAll("\n", " "))
+ assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
+ }
}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 70d294d..540de0f 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -482,4 +482,32 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
}
assert(df2.collect.toSet === expectedResult)
}
+
+ test("query JDBC option") {
+ val expectedResult = Set(
+ (1, "1991-11-09", "1996-01-01 01:23:45")
+ ).map { case (id, date, timestamp) =>
+ Row(BigDecimal.valueOf(id), Date.valueOf(date), Timestamp.valueOf(timestamp))
+ }
+
+ val query = "SELECT id, d, t FROM datetime WHERE id = 1"
+ // query option to pass on the query string.
+ val df = spark.read.format("jdbc")
+ .option("url", jdbcUrl)
+ .option("query", query)
+ .option("oracle.jdbc.mapDateToTimestamp", "false")
+ .load()
+ assert(df.collect.toSet === expectedResult)
+
+ // query option in the create table path.
+ sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW queryOption
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$jdbcUrl',
+ | query '$query',
+ | oracle.jdbc.mapDateToTimestamp false)
+ """.stripMargin.replaceAll("\n", " "))
+ assert(sql("select id, d, t from queryOption").collect.toSet == expectedResult)
+ }
}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index e8d5b46..7caf3d6 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -21,6 +21,7 @@ import java.sql.Connection
import java.util.Properties
import org.apache.spark.sql.Column
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType, ShortType}
import org.apache.spark.tags.DockerTest
@@ -180,4 +181,29 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(rows(0).getSeq(8) == Seq("""{"a": "foo", "b": "bar"}""", """{"a": 1, "b": 2}"""))
assert(rows(0).getSeq(9) == Seq("""{"a": 1, "b": 2, "c": 3}"""))
}
+
+ test("query JDBC option") {
+ val expectedResult = Set(
+ (42, 123456789012345L)
+ ).map { case (c1, c3) =>
+ Row(Integer.valueOf(c1), java.lang.Long.valueOf(c3))
+ }
+
+ val query = "SELECT c1, c3 FROM bar WHERE c1 IS NOT NULL"
+ // query option to pass on the query string.
+ val df = spark.read.format("jdbc")
+ .option("url", jdbcUrl)
+ .option("query", query)
+ .load()
+ assert(df.collect.toSet === expectedResult)
+
+ // query option in the create table path.
+ sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW queryOption
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$jdbcUrl', query '$query')
+ """.stripMargin.replaceAll("\n", " "))
+ assert(sql("select c1, c3 from queryOption").collect.toSet == expectedResult)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index b4469cb..d184f3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -87,7 +87,7 @@ class JDBCOptions(
if (subquery.isEmpty) {
throw new IllegalArgumentException(s"Option `$JDBC_QUERY_STRING` can not be empty.")
} else {
- s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}"
+ s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org