You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/12/22 03:17:31 UTC

[spark] branch branch-3.1 updated: [SPARK-33834][SQL] Verify ALTER TABLE CHANGE COLUMN with Char and Varchar

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 33c7049  [SPARK-33834][SQL] Verify ALTER TABLE CHANGE COLUMN with Char and Varchar
33c7049 is described below

commit 33c70497f49aa516d9433ed04a7db066366d7c1a
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Tue Dec 22 03:07:26 2020 +0000

    [SPARK-33834][SQL] Verify ALTER TABLE CHANGE COLUMN with Char and Varchar
    
    ### What changes were proposed in this pull request?
    
    Verify ALTER TABLE CHANGE COLUMN with Char and Varchar and avoid unexpected change
    For v1 table, changing type is not allowed, we fix a regression that uses the replaced string instead of the original char/varchar type when altering char/varchar columns
    
    For v2 table,
    char/varchar to string,
    char(x) to char(x),
    char(x)/varchar(x) to varchar(y) if x <=y are valid cases,
    other changes are invalid
    
    ### Why are the changes needed?
    
    Verify ALTER TABLE CHANGE COLUMN with Char and Varchar and avoid unexpected change
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    ### How was this patch tested?
    
    new test
    
    Closes #30833 from yaooqinn/SPARK-33834.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit f5fd10b1bc519cc05c98f5235fda3d59155cda9d)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   3 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  18 ++-
 .../sql/catalyst/catalog/SessionCatalog.scala      |  18 ++-
 .../apache/spark/sql/execution/command/ddl.scala   |   2 +-
 .../execution/command/CharVarcharDDLTestBase.scala | 159 +++++++++++++++++++++
 .../spark/sql/HiveCharVarcharTestSuite.scala       |  24 ++++
 6 files changed, 216 insertions(+), 8 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a5a28a4..d138ff3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3455,7 +3455,8 @@ class Analyzer(override val catalogManager: CatalogManager)
               Some(typeChange)
             } else {
               val (fieldNames, field) = fieldOpt.get
-              if (field.dataType == typeChange.newDataType()) {
+              val dt = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
+              if (dt == typeChange.newDataType()) {
                 // The user didn't want the field to change, so remove this change
                 None
               } else {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 11c4883..3e084f0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -513,7 +513,12 @@ trait CheckAnalysis extends PredicateHelper {
                 TypeUtils.failWithIntervalType(add.dataType())
                 colsToAdd(parentName) = fieldsAdded :+ add.fieldNames().last
               case update: UpdateColumnType =>
-                val field = findField("update", update.fieldNames)
+                val field = {
+                  val f = findField("update", update.fieldNames)
+                  CharVarcharUtils.getRawType(f.metadata)
+                    .map(dt => f.copy(dataType = dt))
+                    .getOrElse(f)
+                }
                 val fieldName = update.fieldNames.quoted
                 update.newDataType match {
                   case _: StructType =>
@@ -534,7 +539,16 @@ trait CheckAnalysis extends PredicateHelper {
                   case _ =>
                     // update is okay
                 }
-                if (!Cast.canUpCast(field.dataType, update.newDataType)) {
+
+                // We don't need to handle nested types here which shall fail before
+                def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
+                  case (CharType(l1), CharType(l2)) => l1 == l2
+                  case (CharType(l1), VarcharType(l2)) => l1 <= l2
+                  case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
+                  case _ => Cast.canUpCast(from, to)
+                }
+
+                if (!canAlterColumnType(field.dataType, update.newDataType)) {
                   alter.failAnalysis(
                     s"Cannot update ${table.name} field $fieldName: " +
                         s"${field.dataType.simpleString} cannot be cast to " +
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0d259c9..466a856 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -465,18 +465,28 @@ class SessionCatalog(
   /**
    * Retrieve the metadata of an existing permanent table/view. If no database is specified,
    * assume the table/view is in the current database.
+   * We replace char/varchar with "annotated" string type in the table schema, as the query
+   * engine doesn't support char/varchar yet.
    */
   @throws[NoSuchDatabaseException]
   @throws[NoSuchTableException]
   def getTableMetadata(name: TableIdentifier): CatalogTable = {
+    val t = getTableRawMetadata(name)
+    t.copy(schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(t.schema))
+  }
+
+  /**
+   * Retrieve the metadata of an existing permanent table/view. If no database is specified,
+   * assume the table/view is in the current database.
+   */
+  @throws[NoSuchDatabaseException]
+  @throws[NoSuchTableException]
+  def getTableRawMetadata(name: TableIdentifier): CatalogTable = {
     val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(name.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Some(db)))
-    val t = externalCatalog.getTable(db, table)
-    // We replace char/varchar with "annotated" string type in the table schema, as the query
-    // engine doesn't support char/varchar yet.
-    t.copy(schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(t.schema))
+    externalCatalog.getTable(db, table)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 27ad620..02d747c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -342,7 +342,7 @@ case class AlterTableChangeColumnCommand(
   // TODO: support change column name/dataType/metadata/position.
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    val table = catalog.getTableMetadata(tableName)
+    val table = catalog.getTableRawMetadata(tableName)
     val resolver = sparkSession.sessionState.conf.resolver
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala
new file mode 100644
index 0000000..748dd7e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.InMemoryPartitionTableCatalog
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.types._
+
+trait CharVarcharDDLTestBase extends QueryTest with SQLTestUtils {
+
+  def format: String
+
+  def checkColType(f: StructField, dt: DataType): Unit = {
+    assert(f.dataType == CharVarcharUtils.replaceCharVarcharWithString(dt))
+    assert(CharVarcharUtils.getRawType(f.metadata).contains(dt))
+  }
+
+  test("allow to change column for char(x) to char(y), x == y") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
+      sql("ALTER TABLE t CHANGE COLUMN c TYPE CHAR(4)")
+      checkColType(spark.table("t").schema(1), CharType(4))
+    }
+  }
+
+  test("not allow to change column for char(x) to char(y), x != y") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE t CHANGE COLUMN c TYPE CHAR(5)")
+      }
+      val v1 = e.getMessage contains "'CharType(4)' to 'c' with type 'CharType(5)'"
+      val v2 = e.getMessage contains "char(4) cannot be cast to char(5)"
+      assert(v1 || v2)
+    }
+  }
+
+  test("not allow to change column from string to char type") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c STRING) USING $format")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE t CHANGE COLUMN c TYPE CHAR(5)")
+      }
+      val v1 = e.getMessage contains "'StringType' to 'c' with type 'CharType(5)'"
+      val v2 = e.getMessage contains "string cannot be cast to char(5)"
+      assert(v1 || v2)
+    }
+  }
+
+  test("not allow to change column from int to char type") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i int, c CHAR(4)) USING $format")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE t CHANGE COLUMN i TYPE CHAR(5)")
+      }
+      val v1 = e.getMessage contains "'IntegerType' to 'i' with type 'CharType(5)'"
+      val v2 = e.getMessage contains "int cannot be cast to char(5)"
+      assert(v1 || v2)
+    }
+  }
+
+  test("allow to change column for varchar(x) to varchar(y), x == y") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c VARCHAR(4)) USING $format")
+      sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(4)")
+      checkColType(spark.table("t").schema(1), VarcharType(4))
+    }
+  }
+
+  test("not allow to change column for varchar(x) to varchar(y), x > y") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c VARCHAR(4)) USING $format")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(3)")
+      }
+      val v1 = e.getMessage contains "'VarcharType(4)' to 'c' with type 'VarcharType(3)'"
+      val v2 = e.getMessage contains "varchar(4) cannot be cast to varchar(3)"
+      assert(v1 || v2)
+    }
+  }
+}
+
+class FileSourceCharVarcharDDLTestSuite extends CharVarcharDDLTestBase with SharedSparkSession {
+  override def format: String = "parquet"
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set(SQLConf.USE_V1_SOURCE_LIST, "parquet")
+  }
+}
+
+class DSV2CharVarcharDDLTestSuite extends CharVarcharDDLTestBase
+  with SharedSparkSession {
+  override def format: String = "foo"
+  protected override def sparkConf = {
+    super.sparkConf
+      .set("spark.sql.catalog.testcat", classOf[InMemoryPartitionTableCatalog].getName)
+      .set(SQLConf.DEFAULT_CATALOG.key, "testcat")
+  }
+
+  test("allow to change change column from char to string type") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
+      sql("ALTER TABLE t CHANGE COLUMN c TYPE STRING")
+      assert(spark.table("t").schema(1).dataType === StringType)
+    }
+  }
+
+  test("allow to change column from char(x) to varchar(y) type x <= y") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
+      sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(4)")
+      checkColType(spark.table("t").schema(1), VarcharType(4))
+    }
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
+      sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(5)")
+      checkColType(spark.table("t").schema(1), VarcharType(5))
+    }
+  }
+
+  test("allow to change column from varchar(x) to varchar(y) type x <= y") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c VARCHAR(4)) USING $format")
+      sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(4)")
+      checkColType(spark.table("t").schema(1), VarcharType(4))
+      sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(5)")
+      checkColType(spark.table("t").schema(1), VarcharType(5))
+
+    }
+  }
+
+  test("not allow to change column from char(x) to varchar(y) type x > y") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(3)")
+      }
+      assert(e.getMessage contains "char(4) cannot be cast to varchar(3)")
+    }
+  }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
index 55d305f..f48cfb8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.execution.command.CharVarcharDDLTestBase
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
 class HiveCharVarcharTestSuite extends CharVarcharTestSuite with TestHiveSingleton {
@@ -41,3 +42,26 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite with TestHiveSinglet
     super.afterAll()
   }
 }
+
+class HiveCharVarcharDDLTestSuite extends CharVarcharDDLTestBase with TestHiveSingleton {
+
+  // The default Hive serde doesn't support nested null values.
+  override def format: String = "hive OPTIONS(fileFormat='parquet')"
+
+  private var originalPartitionMode = ""
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    originalPartitionMode = spark.conf.get("hive.exec.dynamic.partition.mode", "")
+    spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
+  }
+
+  override protected def afterAll(): Unit = {
+    if (originalPartitionMode == "") {
+      spark.conf.unset("hive.exec.dynamic.partition.mode")
+    } else {
+      spark.conf.set("hive.exec.dynamic.partition.mode", originalPartitionMode)
+    }
+    super.afterAll()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org