You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2021/11/29 09:07:41 UTC
[spark] branch branch-3.2 updated: [SPARK-37452][SQL] Char and Varchar break backward compatibility between v3.1 and v2
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new d1ea322 [SPARK-37452][SQL] Char and Varchar break backward compatibility between v3.1 and v2
d1ea322 is described below
commit d1ea322b012eb7d097b8ccff6e30943f9db4be2b
Author: Kent Yao <ya...@apache.org>
AuthorDate: Mon Nov 29 17:05:56 2021 +0800
[SPARK-37452][SQL] Char and Varchar break backward compatibility between v3.1 and v2
### What changes were proposed in this pull request?
We will store table schema in table properties for the read-side to restore. In Spark 3.1, we add char/varchar support natively. In some commands like `create table`, `alter table` with these types, the `char(x)` or `varchar(x)` will be stored directly to those properties. If a user uses Spark 2 to read such a table it will fail to parse the schema.
FYI, https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala#L136
A table can be a newly created one by Spark 3.1 and later or an existing one modified by Spark 3.1 and on.
### Why are the changes needed?
backward compatibility
### Does this PR introduce _any_ user-facing change?
That's not necessarily user-facing as a bugfix and only related to internal table properties.
### How was this patch tested?
manully
Closes #34697 from yaooqinn/SPARK-37452.
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Kent Yao <ya...@apache.org>
(cherry picked from commit 0c3c4e2fd06629b77b86eeb36490ecf07d5283fc)
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../spark/sql/hive/HiveExternalCatalog.scala | 35 ++++++++++++++++------
1 file changed, 26 insertions(+), 9 deletions(-)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 568e814..8551780 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
@@ -429,8 +429,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val properties = new mutable.HashMap[String, String]
properties.put(CREATED_SPARK_VERSION, table.createVersion)
+ // This is for backward compatibility to Spark 2 to read tables with char/varchar created by
+ // Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to
+ // clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have
+ // a type mapping for them in `DataType.nameToType`.
+ // See `restoreHiveSerdeTable` for example.
+ val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)
CatalogTable.splitLargeTableProp(
- DATASOURCE_SCHEMA, schema.json, properties.put, conf.get(SCHEMA_STRING_LENGTH_THRESHOLD))
+ DATASOURCE_SCHEMA,
+ newSchema.json,
+ properties.put,
+ conf.get(SCHEMA_STRING_LENGTH_THRESHOLD))
if (partitionColumns.nonEmpty) {
properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
@@ -735,8 +744,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
case None if table.tableType == VIEW =>
// If this is a view created by Spark 2.2 or higher versions, we should restore its schema
// from table properties.
- CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson =>
- table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType])
+ getSchemaFromTableProperties(table.properties).foreach { schemaFromTableProps =>
+ table = table.copy(schema = schemaFromTableProps)
}
// No provider in table properties, which means this is a Hive serde table.
@@ -786,9 +795,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
// schema from table properties.
- val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
- if (schemaJson.isDefined) {
- val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType]
+ val maybeSchemaFromTableProps = getSchemaFromTableProperties(table.properties)
+ if (maybeSchemaFromTableProps.isDefined) {
+ val schemaFromTableProps = maybeSchemaFromTableProps.get
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
@@ -814,6 +823,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}
+ private def getSchemaFromTableProperties(
+ tableProperties: Map[String, String]): Option[StructType] = {
+ CatalogTable.readLargeTableProp(tableProperties, DATASOURCE_SCHEMA).map { schemaJson =>
+ val parsed = DataType.fromJson(schemaJson).asInstanceOf[StructType]
+ CharVarcharUtils.getRawSchema(parsed)
+ }
+ }
+
private def restoreDataSourceTable(table: CatalogTable, provider: String): CatalogTable = {
// Internally we store the table location in storage properties with key "path" for data
// source tables. Here we set the table location to `locationUri` field and filter out the
@@ -828,8 +845,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
- val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
- .map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType())
+ val schemaFromTableProps =
+ getSchemaFromTableProperties(table.properties).getOrElse(new StructType())
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org