You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2021/10/04 19:58:57 UTC

[spark] branch master updated: [MINOR][SQL] Use SQLConf.resolver for caseSensitiveResolution/caseInsensitiveResolution

This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1676648  [MINOR][SQL] Use SQLConf.resolver for caseSensitiveResolution/caseInsensitiveResolution
1676648 is described below

commit 167664896d4b0ca656bbb2ab6d5a045411e64cd1
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Mon Oct 4 12:57:48 2021 -0700

    [MINOR][SQL] Use SQLConf.resolver for caseSensitiveResolution/caseInsensitiveResolution
    
    ### What changes were proposed in this pull request?
    Use `SQLConf.resolver` for `caseSensitiveResolution`/`caseInsensitveResolution` instead of having a new method
    
    ### Why are the changes needed?
    remove redundant code
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    existing code
    
    Closes #34171 from huaxingao/minor.
    
    Authored-by: Huaxin Gao <hu...@apple.com>
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 .../execution/datasources/PartitioningUtils.scala  | 23 +++++-------------
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 27 +++++++---------------
 2 files changed, 14 insertions(+), 36 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 184e6317..273dc77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -29,14 +29,13 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionValueString
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.unsafe.types.UTF8String
@@ -62,7 +61,7 @@ object PartitionSpec {
   val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionPath])
 }
 
-object PartitioningUtils {
+object PartitioningUtils extends SQLConfHelper{
 
   val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
 
@@ -491,7 +490,7 @@ object PartitioningUtils {
     val timestampTry = Try {
       val unescapedRaw = unescapePathName(raw)
       // the inferred data type is consistent with the default timestamp type
-      val timestampType = SQLConf.get.timestampType
+      val timestampType = conf.timestampType
       // try and parse the date, if no exception occurs this is a candidate to be resolved as
       // TimestampType or TimestampNTZType
       timestampType match {
@@ -556,7 +555,7 @@ object PartitioningUtils {
     SchemaUtils.checkColumnNameDuplication(
       partitionColumns, partitionColumns.mkString(", "), caseSensitive)
 
-    partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
+    partitionColumnsSchema(schema, partitionColumns).foreach {
       field => field.dataType match {
         case _: AtomicType => // OK
         case _ => throw QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field)
@@ -570,11 +569,9 @@ object PartitioningUtils {
 
   def partitionColumnsSchema(
       schema: StructType,
-      partitionColumns: Seq[String],
-      caseSensitive: Boolean): StructType = {
-    val equality = columnNameEquality(caseSensitive)
+      partitionColumns: Seq[String]): StructType = {
     StructType(partitionColumns.map { col =>
-      schema.find(f => equality(f.name, col)).getOrElse {
+      schema.find(f => conf.resolver(f.name, col)).getOrElse {
         val schemaCatalog = schema.catalogString
         throw QueryCompilationErrors.partitionColumnNotFoundInSchemaError(col, schemaCatalog)
       }
@@ -610,14 +607,6 @@ object PartitioningUtils {
     }
   }
 
-  private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = {
-    if (caseSensitive) {
-      org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
-    } else {
-      org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
-    }
-  }
-
   /**
    * Given a collection of [[Literal]]s, resolves possible type conflicts by
    * [[findWiderTypeForPartitionColumn]].
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 7b555bd..d49f4b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TaskContext
 import org.apache.spark.executor.InputMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localD
 import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
@@ -49,7 +48,7 @@ import org.apache.spark.util.NextIterator
 /**
  * Util functions for JDBC tables.
  */
-object JdbcUtils extends Logging {
+object JdbcUtils extends Logging with SQLConfHelper {
   /**
    * Returns a factory for creating connections to the given JDBC URL.
    *
@@ -131,18 +130,13 @@ object JdbcUtils extends Logging {
     val columns = if (tableSchema.isEmpty) {
       rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
     } else {
-      val columnNameEquality = if (isCaseSensitive) {
-        org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
-      } else {
-        org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
-      }
       // The generated insert statement needs to follow rddSchema's column sequence and
       // tableSchema's column names. When appending data into some case-sensitive DBMSs like
       // PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
       // RDD column names for user convenience.
       val tableColumnNames = tableSchema.get.fieldNames
       rddSchema.fields.map { col =>
-        val normalizedName = tableColumnNames.find(f => columnNameEquality(f, col.name)).getOrElse {
+        val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
           throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
         }
         dialect.quoteIdentifier(normalizedName)
@@ -475,7 +469,7 @@ object JdbcUtils extends Logging {
           val localTimeMicro = TimeUnit.NANOSECONDS.toMicros(
             rawTime.toLocalTime().toNanoOfDay())
           val utcTimeMicro = DateTimeUtils.toUTCTime(
-            localTimeMicro, SQLConf.get.sessionLocalTimeZone)
+            localTimeMicro, conf.sessionLocalTimeZone)
           row.setLong(pos, utcTimeMicro)
         } else {
           row.update(pos, null)
@@ -594,7 +588,7 @@ object JdbcUtils extends Logging {
         stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos))
 
     case TimestampType =>
-      if (SQLConf.get.datetimeJava8ApiEnabled) {
+      if (conf.datetimeJava8ApiEnabled) {
         (stmt: PreparedStatement, row: Row, pos: Int) =>
           stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
       } else {
@@ -603,7 +597,7 @@ object JdbcUtils extends Logging {
       }
 
     case DateType =>
-      if (SQLConf.get.datetimeJava8ApiEnabled) {
+      if (conf.datetimeJava8ApiEnabled) {
         (stmt: PreparedStatement, row: Row, pos: Int) =>
           stmt.setDate(pos + 1, toJavaDate(localDateToDays(row.getAs[LocalDate](pos))))
       } else {
@@ -812,19 +806,14 @@ object JdbcUtils extends Logging {
       caseSensitive: Boolean,
       createTableColumnTypes: String): Map[String, String] = {
     val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes)
-    val nameEquality = if (caseSensitive) {
-      org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
-    } else {
-      org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
-    }
 
     // checks duplicate columns in the user specified column types.
     SchemaUtils.checkColumnNameDuplication(
-      userSchema.map(_.name), "in the createTableColumnTypes option value", nameEquality)
+      userSchema.map(_.name), "in the createTableColumnTypes option value", conf.resolver)
 
     // checks if user specified column names exist in the DataFrame schema
     userSchema.fieldNames.foreach { col =>
-      schema.find(f => nameEquality(f.name, col)).getOrElse {
+      schema.find(f => conf.resolver(f.name, col)).getOrElse {
         throw QueryCompilationErrors.createTableColumnTypesOptionColumnNotFoundInSchemaError(
           col, schema)
       }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org