You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2021/10/04 19:58:57 UTC
[spark] branch master updated: [MINOR][SQL] Use SQLConf.resolver
for caseSensitiveResolution/caseInsensitiveResolution
This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1676648 [MINOR][SQL] Use SQLConf.resolver for caseSensitiveResolution/caseInsensitiveResolution
1676648 is described below
commit 167664896d4b0ca656bbb2ab6d5a045411e64cd1
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Mon Oct 4 12:57:48 2021 -0700
[MINOR][SQL] Use SQLConf.resolver for caseSensitiveResolution/caseInsensitiveResolution
### What changes were proposed in this pull request?
Use `SQLConf.resolver` for `caseSensitiveResolution`/`caseInsensitveResolution` instead of having a new method
### Why are the changes needed?
remove redundant code
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing code
Closes #34171 from huaxingao/minor.
Authored-by: Huaxin Gao <hu...@apple.com>
Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
.../execution/datasources/PartitioningUtils.scala | 23 +++++-------------
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 27 +++++++---------------
2 files changed, 14 insertions(+), 36 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 184e6317..273dc77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -29,14 +29,13 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionValueString
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.unsafe.types.UTF8String
@@ -62,7 +61,7 @@ object PartitionSpec {
val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionPath])
}
-object PartitioningUtils {
+object PartitioningUtils extends SQLConfHelper{
val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
@@ -491,7 +490,7 @@ object PartitioningUtils {
val timestampTry = Try {
val unescapedRaw = unescapePathName(raw)
// the inferred data type is consistent with the default timestamp type
- val timestampType = SQLConf.get.timestampType
+ val timestampType = conf.timestampType
// try and parse the date, if no exception occurs this is a candidate to be resolved as
// TimestampType or TimestampNTZType
timestampType match {
@@ -556,7 +555,7 @@ object PartitioningUtils {
SchemaUtils.checkColumnNameDuplication(
partitionColumns, partitionColumns.mkString(", "), caseSensitive)
- partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
+ partitionColumnsSchema(schema, partitionColumns).foreach {
field => field.dataType match {
case _: AtomicType => // OK
case _ => throw QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field)
@@ -570,11 +569,9 @@ object PartitioningUtils {
def partitionColumnsSchema(
schema: StructType,
- partitionColumns: Seq[String],
- caseSensitive: Boolean): StructType = {
- val equality = columnNameEquality(caseSensitive)
+ partitionColumns: Seq[String]): StructType = {
StructType(partitionColumns.map { col =>
- schema.find(f => equality(f.name, col)).getOrElse {
+ schema.find(f => conf.resolver(f.name, col)).getOrElse {
val schemaCatalog = schema.catalogString
throw QueryCompilationErrors.partitionColumnNotFoundInSchemaError(col, schemaCatalog)
}
@@ -610,14 +607,6 @@ object PartitioningUtils {
}
}
- private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = {
- if (caseSensitive) {
- org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
- } else {
- org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
- }
- }
-
/**
* Given a collection of [[Literal]]s, resolves possible type conflicts by
* [[findWiderTypeForPartitionColumn]].
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 7b555bd..d49f4b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localD
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
@@ -49,7 +48,7 @@ import org.apache.spark.util.NextIterator
/**
* Util functions for JDBC tables.
*/
-object JdbcUtils extends Logging {
+object JdbcUtils extends Logging with SQLConfHelper {
/**
* Returns a factory for creating connections to the given JDBC URL.
*
@@ -131,18 +130,13 @@ object JdbcUtils extends Logging {
val columns = if (tableSchema.isEmpty) {
rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
} else {
- val columnNameEquality = if (isCaseSensitive) {
- org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
- } else {
- org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
- }
// The generated insert statement needs to follow rddSchema's column sequence and
// tableSchema's column names. When appending data into some case-sensitive DBMSs like
// PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
// RDD column names for user convenience.
val tableColumnNames = tableSchema.get.fieldNames
rddSchema.fields.map { col =>
- val normalizedName = tableColumnNames.find(f => columnNameEquality(f, col.name)).getOrElse {
+ val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
}
dialect.quoteIdentifier(normalizedName)
@@ -475,7 +469,7 @@ object JdbcUtils extends Logging {
val localTimeMicro = TimeUnit.NANOSECONDS.toMicros(
rawTime.toLocalTime().toNanoOfDay())
val utcTimeMicro = DateTimeUtils.toUTCTime(
- localTimeMicro, SQLConf.get.sessionLocalTimeZone)
+ localTimeMicro, conf.sessionLocalTimeZone)
row.setLong(pos, utcTimeMicro)
} else {
row.update(pos, null)
@@ -594,7 +588,7 @@ object JdbcUtils extends Logging {
stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos))
case TimestampType =>
- if (SQLConf.get.datetimeJava8ApiEnabled) {
+ if (conf.datetimeJava8ApiEnabled) {
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
} else {
@@ -603,7 +597,7 @@ object JdbcUtils extends Logging {
}
case DateType =>
- if (SQLConf.get.datetimeJava8ApiEnabled) {
+ if (conf.datetimeJava8ApiEnabled) {
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setDate(pos + 1, toJavaDate(localDateToDays(row.getAs[LocalDate](pos))))
} else {
@@ -812,19 +806,14 @@ object JdbcUtils extends Logging {
caseSensitive: Boolean,
createTableColumnTypes: String): Map[String, String] = {
val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes)
- val nameEquality = if (caseSensitive) {
- org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
- } else {
- org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
- }
// checks duplicate columns in the user specified column types.
SchemaUtils.checkColumnNameDuplication(
- userSchema.map(_.name), "in the createTableColumnTypes option value", nameEquality)
+ userSchema.map(_.name), "in the createTableColumnTypes option value", conf.resolver)
// checks if user specified column names exist in the DataFrame schema
userSchema.fieldNames.foreach { col =>
- schema.find(f => nameEquality(f.name, col)).getOrElse {
+ schema.find(f => conf.resolver(f.name, col)).getOrElse {
throw QueryCompilationErrors.createTableColumnTypesOptionColumnNotFoundInSchemaError(
col, schema)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org