You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/03/21 15:49:58 UTC

spark git commit: [SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tables

Repository: spark
Updated Branches:
  refs/heads/master 63f077fbe -> 4c0ff5f58


[SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tables

## What changes were proposed in this pull request?
Support` ALTER TABLE ADD COLUMNS (...) `syntax for Hive serde and some datasource tables.
In this PR, we consider a few aspects:

1. View is not supported for `ALTER ADD COLUMNS`

2. Since tables created in SparkSQL with Hive DDL syntax will populate table properties with schema information, we need make sure the consistency of the schema before and after ALTER operation in order for future use.

3. For embedded-schema type of format, such as `parquet`, we need to make sure that the predicate on the newly-added columns can be evaluated properly, or pushed down properly. In case of the data file does not have the columns for the newly-added columns, such predicates should return as if the column values are NULLs.

4. For datasource table, this feature does not support the following:
4.1 TEXT format, since there is only one default column `value` is inferred for text format data.
4.2 ORC format, since SparkSQL native ORC reader does not support the difference between user-specified-schema and inferred schema from ORC files.
4.3 Third party datasource types that implements RelationProvider, including the built-in JDBC format, since different implementations by the vendors may have different ways to dealing with schema.
4.4 Other datasource types, such as `parquet`, `json`, `csv`, `hive` are supported.

5. Column names being added can not be duplicate of any existing data column or partition column names. Case sensitivity is taken into consideration according to the sql configuration.

6. This feature also supports In-Memory catalog, while Hive support is turned off.
## How was this patch tested?
Add new test cases

Author: Xin Wu <xi...@us.ibm.com>

Closes #16626 from xwu0226/alter_add_columns.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c0ff5f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c0ff5f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c0ff5f5

Branch: refs/heads/master
Commit: 4c0ff5f58565f811b65f1a11b6121da007bcbd5f
Parents: 63f077f
Author: Xin Wu <xi...@us.ibm.com>
Authored: Tue Mar 21 08:49:54 2017 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Tue Mar 21 08:49:54 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   3 +-
 .../sql/catalyst/catalog/SessionCatalog.scala   |  56 +++++++++
 .../catalyst/catalog/SessionCatalogSuite.scala  |  29 +++++
 .../spark/sql/execution/SparkSqlParser.scala    |  16 +++
 .../spark/sql/execution/command/tables.scala    |  76 +++++++++++-
 .../sql/execution/command/DDLCommandSuite.scala |   8 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 122 +++++++++++++++++++
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 100 ++++++++++++++-
 8 files changed, 400 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4c0ff5f5/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index cc3b8fd..c4a590e 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -85,6 +85,8 @@ statement
         LIKE source=tableIdentifier locationSpec?                      #createTableLike
     | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
         (identifier | FOR COLUMNS identifierSeq)?                      #analyze
+    | ALTER TABLE tableIdentifier
+        ADD COLUMNS '(' columns=colTypeList ')'                        #addTableColumns
     | ALTER (TABLE | VIEW) from=tableIdentifier
         RENAME TO to=tableIdentifier                                   #renameTable
     | ALTER (TABLE | VIEW) tableIdentifier
@@ -198,7 +200,6 @@ unsupportedHiveNativeCommands
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
-    | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
     | kw1=START kw2=TRANSACTION
     | kw1=COMMIT

http://git-wip-us.apache.org/repos/asf/spark/blob/4c0ff5f5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b134fd4..a469d12 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
 import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.{StructField, StructType}
 
 object SessionCatalog {
   val DEFAULT_DATABASE = "default"
@@ -161,6 +162,20 @@ class SessionCatalog(
       throw new TableAlreadyExistsException(db = db, table = name.table)
     }
   }
+
+  private def checkDuplication(fields: Seq[StructField]): Unit = {
+    val columnNames = if (conf.caseSensitiveAnalysis) {
+      fields.map(_.name)
+    } else {
+      fields.map(_.name.toLowerCase)
+    }
+    if (columnNames.distinct.length != columnNames.length) {
+      val duplicateColumns = columnNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => x
+      }
+      throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
+    }
+  }
   // ----------------------------------------------------------------------------
   // Databases
   // ----------------------------------------------------------------------------
@@ -296,6 +311,47 @@ class SessionCatalog(
   }
 
   /**
+   * Alter the schema of a table identified by the provided table identifier. The new schema
+   * should still contain the existing bucket columns and partition columns used by the table. This
+   * method will also update any Spark SQL-related parameters stored as Hive table properties (such
+   * as the schema itself).
+   *
+   * @param identifier TableIdentifier
+   * @param newSchema Updated schema to be used for the table (must contain existing partition and
+   *                  bucket columns, and partition columns need to be at the end)
+   */
+  def alterTableSchema(
+      identifier: TableIdentifier,
+      newSchema: StructType): Unit = {
+    val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
+    val table = formatTableName(identifier.table)
+    val tableIdentifier = TableIdentifier(table, Some(db))
+    requireDbExists(db)
+    requireTableExists(tableIdentifier)
+    checkDuplication(newSchema)
+
+    val catalogTable = externalCatalog.getTable(db, table)
+    val oldSchema = catalogTable.schema
+
+    // not supporting dropping columns yet
+    val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
+    if (nonExistentColumnNames.nonEmpty) {
+      throw new AnalysisException(
+        s"""
+           |Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
+           |not present in the new schema. We don't support dropping columns yet.
+         """.stripMargin)
+    }
+
+    // assuming the newSchema has all partition columns at the end as required
+    externalCatalog.alterTableSchema(db, table, newSchema)
+  }
+
+  private def columnNameResolved(schema: StructType, colName: String): Boolean = {
+    schema.fields.map(_.name).exists(conf.resolver(_, colName))
+  }
+
+  /**
    * Return whether a table/view with the specified name exists. If no database is specified, check
    * with current database.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/4c0ff5f5/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index fd9e5d6..ca4ce1c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
+import org.apache.spark.sql.types._
 
 class InMemorySessionCatalogSuite extends SessionCatalogSuite {
   protected val utils = new CatalogTestUtils {
@@ -448,6 +449,34 @@ abstract class SessionCatalogSuite extends PlanTest {
     }
   }
 
+  test("alter table add columns") {
+    withBasicCatalog { sessionCatalog =>
+      sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+      val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+      sessionCatalog.alterTableSchema(
+        TableIdentifier("t1", Some("default")),
+        StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema))
+
+      val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+      // construct the expected table schema
+      val expectedTableSchema = StructType(oldTab.dataSchema.fields ++
+        Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)
+      assert(newTab.schema == expectedTableSchema)
+    }
+  }
+
+  test("alter table drop columns") {
+    withBasicCatalog { sessionCatalog =>
+      sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+      val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+      val e = intercept[AnalysisException] {
+        sessionCatalog.alterTableSchema(
+          TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1)))
+      }.getMessage
+      assert(e.contains("We don't support dropping columns yet."))
+    }
+  }
+
   test("get table") {
     withBasicCatalog { catalog =>
       assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))

http://git-wip-us.apache.org/repos/asf/spark/blob/4c0ff5f5/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index abea7a3..d4f23f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -742,6 +742,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
+   * Create a [[AlterTableAddColumnsCommand]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table1
+   *   ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
+   * }}}
+   */
+  override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
+    AlterTableAddColumnsCommand(
+      visitTableIdentifier(ctx.tableIdentifier),
+      visitColTypeList(ctx.columns)
+    )
+  }
+
+  /**
    * Create an [[AlterTableSetPropertiesCommand]] command.
    *
    * For example:

http://git-wip-us.apache.org/repos/asf/spark/blob/4c0ff5f5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index beb3dca..93307fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -175,6 +178,77 @@ case class AlterTableRenameCommand(
 }
 
 /**
+ * A command that add columns to a table
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   ALTER TABLE table_identifier
+ *   ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
+ * }}}
+*/
+case class AlterTableAddColumnsCommand(
+    table: TableIdentifier,
+    columns: Seq[StructField]) extends RunnableCommand {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
+    val catalogTable = verifyAlterTableAddColumn(catalog, table)
+
+    try {
+      sparkSession.catalog.uncacheTable(table.quotedString)
+    } catch {
+      case NonFatal(e) =>
+        log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
+    }
+    catalog.refreshTable(table)
+
+    // make sure any partition columns are at the end of the fields
+    val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
+    catalog.alterTableSchema(
+      table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
+
+    Seq.empty[Row]
+  }
+
+  /**
+   * ALTER TABLE ADD COLUMNS command does not support temporary view/table,
+   * view, or datasource table with text, orc formats or external provider.
+   * For datasource table, it currently only supports parquet, json, csv.
+   */
+  private def verifyAlterTableAddColumn(
+      catalog: SessionCatalog,
+      table: TableIdentifier): CatalogTable = {
+    val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
+
+    if (catalogTable.tableType == CatalogTableType.VIEW) {
+      throw new AnalysisException(
+        s"""
+          |ALTER ADD COLUMNS does not support views.
+          |You must drop and re-create the views for adding the new columns. Views: $table
+         """.stripMargin)
+    }
+
+    if (DDLUtils.isDatasourceTable(catalogTable)) {
+      DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
+        // For datasource table, this command can only support the following File format.
+        // TextFileFormat only default to one column "value"
+        // OrcFileFormat can not handle difference between user-specified schema and
+        // inferred schema yet. TODO, once this issue is resolved , we can add Orc back.
+        // Hive type is already considered as hive serde table, so the logic will not
+        // come in here.
+        case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat =>
+        case s =>
+          throw new AnalysisException(
+            s"""
+              |ALTER ADD COLUMNS does not support datasource table with type $s.
+              |You must drop and re-create the table for adding the new columns. Tables: $table
+             """.stripMargin)
+      }
+    }
+    catalogTable
+  }
+}
+
+
+/**
  * A command that loads data into a Hive table.
  *
  * The syntax of this command is:

http://git-wip-us.apache.org/repos/asf/spark/blob/4c0ff5f5/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 4b73b07..13202a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -780,13 +780,7 @@ class DDLCommandSuite extends PlanTest {
     assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES")
   }
 
-  test("alter table: add/replace columns (not allowed)") {
-    assertUnsupported(
-      """
-       |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
-       |ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
-       |COMMENT 'test_comment2') CASCADE
-      """.stripMargin)
+  test("alter table: replace columns (not allowed)") {
     assertUnsupported(
       """
        |ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT

http://git-wip-us.apache.org/repos/asf/spark/blob/4c0ff5f5/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 235c6bf..648b179 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2185,4 +2185,126 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       }
     }
   }
+
+  val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv")
+
+  supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
+    test(s"alter datasource table add columns - $provider") {
+      withTable("t1") {
+        sql(s"CREATE TABLE t1 (c1 int) USING $provider")
+        sql("INSERT INTO t1 VALUES (1)")
+        sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
+        checkAnswer(
+          spark.table("t1"),
+          Seq(Row(1, null))
+        )
+        checkAnswer(
+          sql("SELECT * FROM t1 WHERE c2 is null"),
+          Seq(Row(1, null))
+        )
+
+        sql("INSERT INTO t1 VALUES (3, 2)")
+        checkAnswer(
+          sql("SELECT * FROM t1 WHERE c2 = 2"),
+          Seq(Row(3, 2))
+        )
+      }
+    }
+  }
+
+  supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
+    test(s"alter datasource table add columns - partitioned - $provider") {
+      withTable("t1") {
+        sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
+        sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
+        sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
+        checkAnswer(
+          spark.table("t1"),
+          Seq(Row(1, null, 2))
+        )
+        checkAnswer(
+          sql("SELECT * FROM t1 WHERE c3 is null"),
+          Seq(Row(1, null, 2))
+        )
+        sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
+        checkAnswer(
+          sql("SELECT * FROM t1 WHERE c3 = 3"),
+          Seq(Row(2, 3, 1))
+        )
+        checkAnswer(
+          sql("SELECT * FROM t1 WHERE c2 = 1"),
+          Seq(Row(2, 3, 1))
+        )
+      }
+    }
+  }
+
+  test("alter datasource table add columns - text format not supported") {
+    withTable("t1") {
+      sql("CREATE TABLE t1 (c1 int) USING text")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
+      }.getMessage
+      assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type"))
+    }
+  }
+
+  test("alter table add columns -- not support temp view") {
+    withTempView("tmp_v") {
+      sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)")
+      }
+      assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
+    }
+  }
+
+  test("alter table add columns -- not support view") {
+    withView("v1") {
+      sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
+      }
+      assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
+    }
+  }
+
+  test("alter table add columns with existing column name") {
+    withTable("t1") {
+      sql("CREATE TABLE t1 (c1 int) USING PARQUET")
+      val e = intercept[AnalysisException] {
+        sql("ALTER TABLE t1 ADD COLUMNS (c1 string)")
+      }.getMessage
+      assert(e.contains("Found duplicate column(s)"))
+    }
+  }
+
+  Seq(true, false).foreach { caseSensitive =>
+    test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
+        withTable("t1") {
+          sql("CREATE TABLE t1 (c1 int) USING PARQUET")
+          if (!caseSensitive) {
+            val e = intercept[AnalysisException] {
+              sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+            }.getMessage
+            assert(e.contains("Found duplicate column(s)"))
+          } else {
+            if (isUsingHiveMetastore) {
+              // hive catalog will still complains that c1 is duplicate column name because hive
+              // identifiers are case insensitive.
+              val e = intercept[AnalysisException] {
+                sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+              }.getMessage
+              assert(e.contains("HiveException"))
+            } else {
+              sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+              assert(spark.table("t1").schema
+                .equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
+            }
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4c0ff5f5/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index d752c41..04bc79d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+import org.apache.spark.sql.types._
 
 // TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
 class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeAndAfterEach {
@@ -112,6 +112,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
 class HiveDDLSuite
   extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
   import testImplicits._
+  val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO")
 
   override def afterEach(): Unit = {
     try {
@@ -1860,4 +1861,101 @@ class HiveDDLSuite
       }
     }
   }
+
+  hiveFormats.foreach { tableType =>
+    test(s"alter hive serde table add columns -- partitioned - $tableType") {
+      withTable("tab") {
+        sql(
+          s"""
+             |CREATE TABLE tab (c1 int, c2 int)
+             |PARTITIONED BY (c3 int) STORED AS $tableType
+          """.stripMargin)
+
+        sql("INSERT INTO tab PARTITION (c3=1) VALUES (1, 2)")
+        sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
+
+        checkAnswer(
+          sql("SELECT * FROM tab WHERE c3 = 1"),
+          Seq(Row(1, 2, null, 1))
+        )
+        assert(spark.table("tab").schema
+          .contains(StructField("c4", IntegerType)))
+        sql("INSERT INTO tab PARTITION (c3=2) VALUES (2, 3, 4)")
+        checkAnswer(
+          spark.table("tab"),
+          Seq(Row(1, 2, null, 1), Row(2, 3, 4, 2))
+        )
+        checkAnswer(
+          sql("SELECT * FROM tab WHERE c3 = 2 AND c4 IS NOT NULL"),
+          Seq(Row(2, 3, 4, 2))
+        )
+
+        sql("ALTER TABLE tab ADD COLUMNS (c5 char(10))")
+        assert(spark.table("tab").schema.find(_.name == "c5")
+          .get.metadata.getString("HIVE_TYPE_STRING") == "char(10)")
+      }
+    }
+  }
+
+  hiveFormats.foreach { tableType =>
+    test(s"alter hive serde table add columns -- with predicate - $tableType ") {
+      withTable("tab") {
+        sql(s"CREATE TABLE tab (c1 int, c2 int) STORED AS $tableType")
+        sql("INSERT INTO tab VALUES (1, 2)")
+        sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
+        checkAnswer(
+          sql("SELECT * FROM tab WHERE c4 IS NULL"),
+          Seq(Row(1, 2, null))
+        )
+        assert(spark.table("tab").schema
+          .contains(StructField("c4", IntegerType)))
+        sql("INSERT INTO tab VALUES (2, 3, 4)")
+        checkAnswer(
+          sql("SELECT * FROM tab WHERE c4 = 4 "),
+          Seq(Row(2, 3, 4))
+        )
+        checkAnswer(
+          spark.table("tab"),
+          Seq(Row(1, 2, null), Row(2, 3, 4))
+        )
+      }
+    }
+  }
+
+  Seq(true, false).foreach { caseSensitive =>
+    test(s"alter add columns with existing column name - caseSensitive $caseSensitive") {
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
+        withTable("tab") {
+          sql("CREATE TABLE tab (c1 int) PARTITIONED BY (c2 int) STORED AS PARQUET")
+          if (!caseSensitive) {
+            // duplicating partitioning column name
+            val e1 = intercept[AnalysisException] {
+              sql("ALTER TABLE tab ADD COLUMNS (C2 string)")
+            }.getMessage
+            assert(e1.contains("Found duplicate column(s)"))
+
+            // duplicating data column name
+            val e2 = intercept[AnalysisException] {
+              sql("ALTER TABLE tab ADD COLUMNS (C1 string)")
+            }.getMessage
+            assert(e2.contains("Found duplicate column(s)"))
+          } else {
+            // hive catalog will still complains that c1 is duplicate column name because hive
+            // identifiers are case insensitive.
+            val e1 = intercept[AnalysisException] {
+              sql("ALTER TABLE tab ADD COLUMNS (C2 string)")
+            }.getMessage
+            assert(e1.contains("HiveException"))
+
+            // hive catalog will still complains that c1 is duplicate column name because hive
+            // identifiers are case insensitive.
+            val e2 = intercept[AnalysisException] {
+              sql("ALTER TABLE tab ADD COLUMNS (C1 string)")
+            }.getMessage
+            assert(e2.contains("HiveException"))
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org