You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2024/03/25 15:51:30 UTC
(spark) branch branch-3.4 updated: [SPARK-47537][SQL][3.4] Fix error data type mapping on MySQL Connector/J
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 77fd58bf8d52 [SPARK-47537][SQL][3.4] Fix error data type mapping on MySQL Connector/J
77fd58bf8d52 is described below
commit 77fd58bf8d5276906c674f3cdcec2715c8520d47
Author: Kent Yao <ya...@apache.org>
AuthorDate: Mon Mar 25 08:51:20 2024 -0700
[SPARK-47537][SQL][3.4] Fix error data type mapping on MySQL Connector/J
### What changes were proposed in this pull request?
This PR fixes:
- BIT(n>1) is wrongly mapping to boolean instead of long for MySQL Connector/J. This is because we only have a case branch for Maria Connector/J.
- MySQL Docker Integration Tests were using Maria Connector/J, not MySQL Connector/J
### Why are the changes needed?
Bugfix
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45691 from yaooqinn/SPARK-47537-BB.
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../spark/sql/jdbc/MySQLIntegrationSuite.scala | 47 +++++++++++++++++++++-
.../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 28 ++++++++++++-
.../spark/sql/jdbc/v2/MySQLNamespaceSuite.scala | 4 +-
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 5 +++
4 files changed, 79 insertions(+), 5 deletions(-)
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index bc202b1b8323..d0fcbfb7aaa8 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -43,7 +43,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
override val usesIpc = false
override val jdbcPort: Int = 3306
override def getJdbcUrl(ip: String, port: Int): String =
- s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
+ s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&disableMariaDbDriver"
}
override def dataPreparation(conn: Connection): Unit = {
@@ -74,6 +74,19 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
"'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate()
}
+ def testConnection(): Unit = {
+ val conn = getConnection()
+ try {
+ assert(conn.getClass.getName === "com.mysql.cj.jdbc.ConnectionImpl")
+ } finally {
+ conn.close()
+ }
+ }
+
+ test("SPARK-47537: ensure use the right jdbc driver") {
+ testConnection()
+ }
+
test("Basic test") {
val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
val rows = df.collect()
@@ -193,3 +206,35 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
}
}
+
+/**
+ * To run this test suite for a specific version (e.g., mysql:8.3.0):
+ * {{{
+ * ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0
+ * ./build/sbt -Pdocker-integration-tests
+ * "docker-integration-tests/testOnly *MySQLOverMariaConnectorIntegrationSuite"
+ * }}}
+ */
+@DockerTest
+class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite {
+
+ override val db = new DatabaseOnDocker {
+ override val imageName = sys.env.getOrElse("MYSQL_DOCKER_IMAGE_NAME", "mysql:8.0.31")
+ override val env = Map(
+ "MYSQL_ROOT_PASSWORD" -> "rootpass"
+ )
+ override val usesIpc = false
+ override val jdbcPort: Int = 3306
+ override def getJdbcUrl(ip: String, port: Int): String =
+ s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
+ }
+
+ override def testConnection(): Unit = {
+ val conn = getConnection()
+ try {
+ assert(conn.getClass.getName === "org.mariadb.jdbc.MariaDbConnection")
+ } finally {
+ conn.close()
+ }
+ }
+}
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 072fdbb3f342..c4056c224f66 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -47,8 +47,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
override val jdbcPort: Int = 3306
override def getJdbcUrl(ip: String, port: Int): String =
- s"jdbc:mysql://$ip:$port/" +
- s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false"
+ s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" +
+ "&useSSL=false&disableMariaDbDriver"
}
override def sparkConf: SparkConf = super.sparkConf
@@ -128,3 +128,27 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
testStddevPop()
testStddevSamp()
}
+
+/**
+ * To run this test suite for a specific version (e.g., mysql:8.3.0):
+ * {{{
+ * ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0
+ * ./build/sbt -Pdocker-integration-tests
+ * "docker-integration-tests/testOnly *MySQLOverMariaConnectorIntegrationSuite"
+ * }}}
+ */
+@DockerTest
+class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite {
+ override val db = new DatabaseOnDocker {
+ override val imageName = sys.env.getOrElse("MYSQL_DOCKER_IMAGE_NAME", "mysql:8.0.31")
+ override val env = Map(
+ "MYSQL_ROOT_PASSWORD" -> "rootpass"
+ )
+ override val usesIpc = false
+ override val jdbcPort: Int = 3306
+
+ override def getJdbcUrl(ip: String, port: Int): String =
+ s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" +
+ "&useSSL=false"
+ }
+}
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala
index b73e2b8fd23c..33e457583e3a 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala
@@ -45,8 +45,8 @@ class MySQLNamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNamespac
override val jdbcPort: Int = 3306
override def getJdbcUrl(ip: String, port: Int): String =
- s"jdbc:mysql://$ip:$port/" +
- s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false"
+ s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true" +
+ "&useSSL=false&disableMariaDbDriver"
}
val map = new CaseInsensitiveStringMap(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 29eb8916bb79..3cea8fa10883 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -75,10 +75,15 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
+ // MariaDB connector behaviour
// This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
// byte arrays instead of longs.
md.putLong("binarylong", 1)
Option(LongType)
+ } else if (sqlType == Types.BIT && size > 1) {
+ // MySQL connector behaviour
+ md.putLong("binarylong", 1)
+ Option(LongType)
} else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
Option(BooleanType)
} else None
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org