You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2021/11/29 09:07:41 UTC

[spark] branch branch-3.2 updated: [SPARK-37452][SQL] Char and Varchar break backward compatibility between v3.1 and v2

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new d1ea322  [SPARK-37452][SQL] Char and Varchar break backward compatibility between v3.1 and v2
d1ea322 is described below

commit d1ea322b012eb7d097b8ccff6e30943f9db4be2b
Author: Kent Yao <ya...@apache.org>
AuthorDate: Mon Nov 29 17:05:56 2021 +0800

    [SPARK-37452][SQL] Char and Varchar break backward compatibility between v3.1 and v2
    
    ### What changes were proposed in this pull request?
    
    We will store table schema in table properties for the read-side to restore. In Spark 3.1, we add char/varchar support natively. In some commands like `create table`, `alter table` with these types,  the `char(x)` or `varchar(x)` will be stored directly to those properties. If a user uses Spark 2 to read such a table it will fail to parse the schema.
    
    FYI, https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala#L136
    
    A table can be a newly created one by Spark 3.1 and later or an existing one modified by Spark 3.1 and on.
    
    ### Why are the changes needed?
    
    backward compatibility
    
    ### Does this PR introduce _any_ user-facing change?
    
    That's not necessarily user-facing as a bugfix and only related to internal table properties.
    
    ### How was this patch tested?
    
    manully
    
    Closes #34697 from yaooqinn/SPARK-37452.
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Kent Yao <ya...@apache.org>
    (cherry picked from commit 0c3c4e2fd06629b77b86eeb36490ecf07d5283fc)
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../spark/sql/hive/HiveExternalCatalog.scala       | 35 ++++++++++++++++------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 568e814..8551780 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
 import org.apache.spark.sql.hive.client.HiveClient
@@ -429,8 +429,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val properties = new mutable.HashMap[String, String]
 
     properties.put(CREATED_SPARK_VERSION, table.createVersion)
+    // This is for backward compatibility to Spark 2 to read tables with char/varchar created by
+    // Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to
+    // clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have
+    // a type mapping for them in `DataType.nameToType`.
+    // See `restoreHiveSerdeTable` for example.
+    val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)
     CatalogTable.splitLargeTableProp(
-      DATASOURCE_SCHEMA, schema.json, properties.put, conf.get(SCHEMA_STRING_LENGTH_THRESHOLD))
+      DATASOURCE_SCHEMA,
+      newSchema.json,
+      properties.put,
+      conf.get(SCHEMA_STRING_LENGTH_THRESHOLD))
 
     if (partitionColumns.nonEmpty) {
       properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
@@ -735,8 +744,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       case None if table.tableType == VIEW =>
         // If this is a view created by Spark 2.2 or higher versions, we should restore its schema
         // from table properties.
-        CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson =>
-          table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType])
+        getSchemaFromTableProperties(table.properties).foreach { schemaFromTableProps =>
+          table = table.copy(schema = schemaFromTableProps)
         }
 
       // No provider in table properties, which means this is a Hive serde table.
@@ -786,9 +795,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
     // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
     // schema from table properties.
-    val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
-    if (schemaJson.isDefined) {
-      val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType]
+    val maybeSchemaFromTableProps = getSchemaFromTableProperties(table.properties)
+    if (maybeSchemaFromTableProps.isDefined) {
+      val schemaFromTableProps = maybeSchemaFromTableProps.get
       val partColumnNames = getPartitionColumnsFromTableProperties(table)
       val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
 
@@ -814,6 +823,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
+  private def getSchemaFromTableProperties(
+      tableProperties: Map[String, String]): Option[StructType] = {
+    CatalogTable.readLargeTableProp(tableProperties, DATASOURCE_SCHEMA).map { schemaJson =>
+      val parsed = DataType.fromJson(schemaJson).asInstanceOf[StructType]
+      CharVarcharUtils.getRawSchema(parsed)
+    }
+  }
+
   private def restoreDataSourceTable(table: CatalogTable, provider: String): CatalogTable = {
     // Internally we store the table location in storage properties with key "path" for data
     // source tables. Here we set the table location to `locationUri` field and filter out the
@@ -828,8 +845,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
     val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
-    val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
-      .map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType())
+    val schemaFromTableProps =
+      getSchemaFromTableProperties(table.properties).getOrElse(new StructType())
     val partColumnNames = getPartitionColumnsFromTableProperties(table)
     val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org