You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/02 11:37:58 UTC

spark git commit: [SPARK-22306][SQL][2.2] alter table schema should not erase the bucketing metadata at hive side

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c311c5e79 -> 4074ed2e1


[SPARK-22306][SQL][2.2] alter table schema should not erase the bucketing metadata at hive side

## What changes were proposed in this pull request?

When we alter table schema, we set the new schema to spark `CatalogTable`, convert it to hive table, and finally call `hive.alterTable`. This causes a problem in Spark 2.2, because hive bucketing metedata is not recognized by Spark, which means a Spark `CatalogTable` representing a hive table is always non-bucketed, and when we convert it to hive table and call `hive.alterTable`, the original hive bucketing metadata will be removed.

To fix this bug, we should read out the raw hive table metadata, update its schema, and call `hive.alterTable`. By doing this we can guarantee only the schema is changed, and nothing else.

Note that this bug doesn't exist in the master branch, because we've added hive bucketing support and the hive bucketing metadata can be recognized by Spark. I think we should merge this PR to master too, for code cleanup and reduce the difference between master and 2.2 branch for backporting.

## How was this patch tested?

new regression test

Author: Wenchen Fan <we...@databricks.com>

Closes #19622 from cloud-fan/infer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4074ed2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4074ed2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4074ed2e

Branch: refs/heads/branch-2.2
Commit: 4074ed2e1363c886878bbf9483e21abd1745f482
Parents: c311c5e
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Nov 2 12:37:52 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Nov 2 12:37:52 2017 +0100

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  | 12 ++---
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  7 +--
 .../sql/catalyst/catalog/SessionCatalog.scala   | 25 ++++-----
 .../catalyst/catalog/ExternalCatalogSuite.scala | 11 ++--
 .../catalyst/catalog/SessionCatalogSuite.scala  | 21 ++++++--
 .../spark/sql/execution/command/tables.scala    | 10 +---
 .../spark/sql/hive/HiveExternalCatalog.scala    | 57 +++++++++++++-------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 11 ++--
 .../spark/sql/hive/client/HiveClient.scala      | 11 ++++
 .../spark/sql/hive/client/HiveClientImpl.scala  | 45 ++++++++++------
 .../sql/hive/HiveExternalCatalogSuite.scala     | 18 +++++++
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  4 +-
 .../sql/hive/execution/Hive_2_1_DDLSuite.scala  |  2 +-
 13 files changed, 148 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 18644b0..8db6f79 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -148,17 +148,15 @@ abstract class ExternalCatalog
   def alterTable(tableDefinition: CatalogTable): Unit
 
   /**
-   * Alter the schema of a table identified by the provided database and table name. The new schema
-   * should still contain the existing bucket columns and partition columns used by the table. This
-   * method will also update any Spark SQL-related parameters stored as Hive table properties (such
-   * as the schema itself).
+   * Alter the data schema of a table identified by the provided database and table name. The new
+   * data schema should not have conflict column names with the existing partition columns, and
+   * should still contain all the existing data columns.
    *
    * @param db Database that table to alter schema for exists in
    * @param table Name of table to alter schema for
-   * @param schema Updated schema to be used for the table (must contain existing partition and
-   *               bucket columns)
+   * @param newDataSchema Updated data schema to be used for the table.
    */
-  def alterTableSchema(db: String, table: String, schema: StructType): Unit
+  def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit
 
   def getTable(db: String, table: String): CatalogTable
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index bf8542c..f83e28f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -301,13 +301,14 @@ class InMemoryCatalog(
     catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
   }
 
-  override def alterTableSchema(
+  override def alterTableDataSchema(
       db: String,
       table: String,
-      schema: StructType): Unit = synchronized {
+      newDataSchema: StructType): Unit = synchronized {
     requireTableExists(db, table)
     val origTable = catalog(db).tables(table).table
-    catalog(db).tables(table).table = origTable.copy(schema = schema)
+    val newSchema = StructType(newDataSchema ++ origTable.partitionSchema)
+    catalog(db).tables(table).table = origTable.copy(schema = newSchema)
   }
 
   override def getTable(db: String, table: String): CatalogTable = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index df8f9aa..bbcfdac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -336,30 +336,28 @@ class SessionCatalog(
   }
 
   /**
-   * Alter the schema of a table identified by the provided table identifier. The new schema
-   * should still contain the existing bucket columns and partition columns used by the table. This
-   * method will also update any Spark SQL-related parameters stored as Hive table properties (such
-   * as the schema itself).
+   * Alter the data schema of a table identified by the provided table identifier. The new data
+   * schema should not have conflict column names with the existing partition columns, and should
+   * still contain all the existing data columns.
    *
    * @param identifier TableIdentifier
-   * @param newSchema Updated schema to be used for the table (must contain existing partition and
-   *                  bucket columns, and partition columns need to be at the end)
+   * @param newDataSchema Updated data schema to be used for the table
    */
-  def alterTableSchema(
+  def alterTableDataSchema(
       identifier: TableIdentifier,
-      newSchema: StructType): Unit = {
+      newDataSchema: StructType): Unit = {
     val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(identifier.table)
     val tableIdentifier = TableIdentifier(table, Some(db))
     requireDbExists(db)
     requireTableExists(tableIdentifier)
-    checkDuplication(newSchema)
 
     val catalogTable = externalCatalog.getTable(db, table)
-    val oldSchema = catalogTable.schema
-
+    checkDuplication(newDataSchema ++ catalogTable.partitionSchema)
+    val oldDataSchema = catalogTable.dataSchema
     // not supporting dropping columns yet
-    val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
+    val nonExistentColumnNames =
+      oldDataSchema.map(_.name).filterNot(columnNameResolved(newDataSchema, _))
     if (nonExistentColumnNames.nonEmpty) {
       throw new AnalysisException(
         s"""
@@ -368,8 +366,7 @@ class SessionCatalog(
          """.stripMargin)
     }
 
-    // assuming the newSchema has all partition columns at the end as required
-    externalCatalog.alterTableSchema(db, table, newSchema)
+    externalCatalog.alterTableDataSchema(db, table, newDataSchema)
   }
 
   private def columnNameResolved(schema: StructType, colName: String): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 54ecf44..014d0c0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -245,15 +245,12 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
 
   test("alter table schema") {
     val catalog = newBasicCatalog()
-    val tbl1 = catalog.getTable("db2", "tbl1")
-    val newSchema = StructType(Seq(
+    val newDataSchema = StructType(Seq(
       StructField("new_field_1", IntegerType),
-      StructField("new_field_2", StringType),
-      StructField("a", IntegerType),
-      StructField("b", StringType)))
-    catalog.alterTableSchema("db2", "tbl1", newSchema)
+      StructField("new_field_2", StringType)))
+    catalog.alterTableDataSchema("db2", "tbl1", newDataSchema)
     val newTbl1 = catalog.getTable("db2", "tbl1")
-    assert(newTbl1.schema == newSchema)
+    assert(newTbl1.dataSchema == newDataSchema)
   }
 
   test("get table") {

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 9c1b638..a8c6c06 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -452,9 +452,9 @@ abstract class SessionCatalogSuite extends PlanTest {
     withBasicCatalog { sessionCatalog =>
       sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
       val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
-      sessionCatalog.alterTableSchema(
+      sessionCatalog.alterTableDataSchema(
         TableIdentifier("t1", Some("default")),
-        StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema))
+        StructType(oldTab.dataSchema.add("c3", IntegerType)))
 
       val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
       // construct the expected table schema
@@ -464,13 +464,26 @@ abstract class SessionCatalogSuite extends PlanTest {
     }
   }
 
+  test("alter table add columns which are conflicting with partition columns") {
+    withBasicCatalog { sessionCatalog =>
+      sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+      val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+      val e = intercept[AnalysisException] {
+        sessionCatalog.alterTableDataSchema(
+          TableIdentifier("t1", Some("default")),
+          StructType(oldTab.dataSchema.add("a", IntegerType)))
+      }.getMessage
+      assert(e.contains("Found duplicate column(s): a"))
+    }
+  }
+
   test("alter table drop columns") {
     withBasicCatalog { sessionCatalog =>
       sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
       val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
       val e = intercept[AnalysisException] {
-        sessionCatalog.alterTableSchema(
-          TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1)))
+        sessionCatalog.alterTableDataSchema(
+          TableIdentifier("t1", Some("default")), StructType(oldTab.dataSchema.drop(1)))
       }.getMessage
       assert(e.contains("We don't support dropping columns yet."))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 6348638..8b61240 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -187,11 +187,10 @@ case class AlterTableRenameCommand(
 */
 case class AlterTableAddColumnsCommand(
     table: TableIdentifier,
-    columns: Seq[StructField]) extends RunnableCommand {
+    colsToAdd: Seq[StructField]) extends RunnableCommand {
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val catalogTable = verifyAlterTableAddColumn(catalog, table)
-
     try {
       sparkSession.catalog.uncacheTable(table.quotedString)
     } catch {
@@ -199,12 +198,7 @@ case class AlterTableAddColumnsCommand(
         log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
     }
     catalog.refreshTable(table)
-
-    // make sure any partition columns are at the end of the fields
-    val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
-    catalog.alterTableSchema(
-      table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
-
+    catalog.alterTableDataSchema(table, StructType(catalogTable.dataSchema ++ colsToAdd))
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2ea4e15..1c26d98 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -138,16 +138,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   }
 
   /**
-   * Checks the validity of column names. Hive metastore disallows the table to use comma in
+   * Checks the validity of data column names. Hive metastore disallows the table to use comma in
    * data column names. Partition columns do not have such a restriction. Views do not have such
    * a restriction.
    */
-  private def verifyColumnNames(table: CatalogTable): Unit = {
-    if (table.tableType != VIEW) {
-      table.dataSchema.map(_.name).foreach { colName =>
+  private def verifyDataSchema(
+      tableName: TableIdentifier, tableType: CatalogTableType, dataSchema: StructType): Unit = {
+    if (tableType != VIEW) {
+      dataSchema.map(_.name).foreach { colName =>
         if (colName.contains(",")) {
           throw new AnalysisException("Cannot create a table having a column whose name contains " +
-            s"commas in Hive metastore. Table: ${table.identifier}; Column: $colName")
+            s"commas in Hive metastore. Table: $tableName; Column: $colName")
         }
       }
     }
@@ -218,7 +219,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val table = tableDefinition.identifier.table
     requireDbExists(db)
     verifyTableProperties(tableDefinition)
-    verifyColumnNames(tableDefinition)
+    verifyDataSchema(
+      tableDefinition.identifier, tableDefinition.tableType, tableDefinition.dataSchema)
 
     if (tableExists(db, table) && !ignoreIfExists) {
       throw new TableAlreadyExistsException(db = db, table = table)
@@ -295,7 +297,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
         storage = table.storage.copy(
           locationUri = None,
           properties = storagePropsWithLocation),
-        schema = table.partitionSchema,
+        schema = StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema),
         bucketSpec = None,
         properties = table.properties ++ tableProperties)
     }
@@ -312,6 +314,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
         None
       }
 
+      // TODO: empty data schema is not hive compatible, we only do it to keep behavior as it was
+      // because previously we generate the special empty schema in `HiveClient`. Remove this in
+      // Spark 2.3.
+      val schema = if (table.dataSchema.isEmpty) {
+        StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema)
+      } else {
+        table.schema
+      }
+
       table.copy(
         storage = table.storage.copy(
           locationUri = location,
@@ -320,6 +331,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           serde = serde.serde,
           properties = storagePropsWithLocation
         ),
+        schema = schema,
         properties = table.properties ++ tableProperties)
     }
 
@@ -630,32 +642,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
-  override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
+  override def alterTableDataSchema(
+      db: String, table: String, newDataSchema: StructType): Unit = withClient {
     requireTableExists(db, table)
-    val rawTable = getRawTable(db, table)
-    // Add table metadata such as table schema, partition columns, etc. to table properties.
-    val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema)
-    val withNewSchema = rawTable.copy(properties = updatedProperties, schema = schema)
-    verifyColumnNames(withNewSchema)
+    val oldTable = getTable(db, table)
+    verifyDataSchema(oldTable.identifier, oldTable.tableType, newDataSchema)
+    val schemaProps =
+      tableMetaToTableProps(oldTable, StructType(newDataSchema ++ oldTable.partitionSchema)).toMap
 
-    if (isDatasourceTable(rawTable)) {
+    if (isDatasourceTable(oldTable)) {
       // For data source tables, first try to write it with the schema set; if that does not work,
       // try again with updated properties and the partition schema. This is a simplified version of
       // what createDataSourceTable() does, and may leave the table in a state unreadable by Hive
       // (for example, the schema does not match the data source schema, or does not match the
       // storage descriptor).
       try {
-        client.alterTable(withNewSchema)
+        client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
       } catch {
         case NonFatal(e) =>
           val warningMessage =
-            s"Could not alter schema of table  ${rawTable.identifier.quotedString} in a Hive " +
+            s"Could not alter schema of table ${oldTable.identifier.quotedString} in a Hive " +
               "compatible way. Updating Hive metastore in Spark SQL specific format."
           logWarning(warningMessage, e)
-          client.alterTable(withNewSchema.copy(schema = rawTable.partitionSchema))
+          client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, schemaProps)
       }
     } else {
-      client.alterTable(withNewSchema)
+      client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
     }
   }
 
@@ -1191,6 +1203,15 @@ object HiveExternalCatalog {
   val TABLE_PARTITION_PROVIDER_CATALOG = "catalog"
   val TABLE_PARTITION_PROVIDER_FILESYSTEM = "filesystem"
 
+  // When storing data source tables in hive metastore, we need to set data schema to empty if the
+  // schema is hive-incompatible. However we need a hack to preserve existing behavior. Before
+  // Spark 2.0, we do not set a default serde here (this was done in Hive), and so if the user
+  // provides an empty schema Hive would automatically populate the schema with a single field
+  // "col". However, after SPARK-14388, we set the default serde to LazySimpleSerde so this
+  // implicit behavior no longer happens. Therefore, we need to do it in Spark ourselves.
+  val EMPTY_DATA_SCHEMA = new StructType()
+    .add("col", "array<string>", nullable = true, comment = "from deserializer")
+
   /**
    * Returns the fully qualified name used in table properties for a particular column stat.
    * For example, for column "mycol", and "min" stat, this should return

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f23b27c..f858dd9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -246,11 +246,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 
       inferredSchema match {
         case Some(dataSchema) =>
-          val schema = StructType(dataSchema ++ relation.tableMeta.partitionSchema)
           if (inferenceMode == INFER_AND_SAVE) {
-            updateCatalogSchema(relation.tableMeta.identifier, schema)
+            updateDataSchema(relation.tableMeta.identifier, dataSchema)
           }
-          relation.tableMeta.copy(schema = schema)
+          val newSchema = StructType(dataSchema ++ relation.tableMeta.partitionSchema)
+          relation.tableMeta.copy(schema = newSchema)
         case None =>
           logWarning(s"Unable to infer schema for table $tableName from file format " +
             s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.")
@@ -261,10 +261,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     }
   }
 
-  private def updateCatalogSchema(identifier: TableIdentifier, schema: StructType): Unit = try {
-    val db = identifier.database.get
+  private def updateDataSchema(identifier: TableIdentifier, newDataSchema: StructType): Unit = try {
     logInfo(s"Saving case-sensitive schema for table ${identifier.unquotedString}")
-    sparkSession.sharedState.externalCatalog.alterTableSchema(db, identifier.table, schema)
+    sparkSession.sessionState.catalog.alterTableDataSchema(identifier, newDataSchema)
   } catch {
     case NonFatal(ex) =>
       logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex)

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 16a80f9..492a2ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -89,6 +90,16 @@ private[hive] trait HiveClient {
   /** Updates the given table with new metadata, optionally renaming the table. */
   def alterTable(tableName: String, table: CatalogTable): Unit
 
+  /**
+   * Updates the given table with a new data schema and table properties, and keep everything else
+   * unchanged.
+   *
+   * TODO(cloud-fan): it's a little hacky to introduce the schema table properties here in
+   * `HiveClient`, but we don't have a cleaner solution now.
+   */
+  def alterTableDataSchema(
+    dbName: String, tableName: String, newDataSchema: StructType, schemaProps: Map[String, String])
+
   /** Creates a new database with the given name. */
   def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 2cf11f4..541797d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -46,8 +46,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
 import org.apache.spark.sql.hive.client.HiveClientImpl._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{CircularBuffer, Utils}
@@ -462,6 +461,33 @@ private[hive] class HiveClientImpl(
     shim.alterTable(client, qualifiedTableName, hiveTable)
   }
 
+  override def alterTableDataSchema(
+      dbName: String,
+      tableName: String,
+      newDataSchema: StructType,
+      schemaProps: Map[String, String]): Unit = withHiveState {
+    val oldTable = client.getTable(dbName, tableName)
+    val hiveCols = newDataSchema.map(toHiveColumn)
+    oldTable.setFields(hiveCols.asJava)
+
+    // remove old schema table properties
+    val it = oldTable.getParameters.entrySet.iterator
+    while (it.hasNext) {
+      val entry = it.next()
+      val isSchemaProp = entry.getKey.startsWith(DATASOURCE_SCHEMA_PART_PREFIX) ||
+        entry.getKey == DATASOURCE_SCHEMA || entry.getKey == DATASOURCE_SCHEMA_NUMPARTS
+      if (isSchemaProp) {
+        it.remove()
+      }
+    }
+
+    // set new schema table properties
+    schemaProps.foreach { case (k, v) => oldTable.setProperty(k, v) }
+
+    val qualifiedTableName = s"$dbName.$tableName"
+    shim.alterTable(client, qualifiedTableName, oldTable)
+  }
+
   override def createPartitions(
       db: String,
       table: String,
@@ -837,20 +863,7 @@ private[hive] object HiveClientImpl {
     val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
       table.partitionColumnNames.contains(c.getName)
     }
-    // after SPARK-19279, it is not allowed to create a hive table with an empty schema,
-    // so here we should not add a default col schema
-    if (schema.isEmpty && HiveExternalCatalog.isDatasourceTable(table)) {
-      // This is a hack to preserve existing behavior. Before Spark 2.0, we do not
-      // set a default serde here (this was done in Hive), and so if the user provides
-      // an empty schema Hive would automatically populate the schema with a single
-      // field "col". However, after SPARK-14388, we set the default serde to
-      // LazySimpleSerde so this implicit behavior no longer happens. Therefore,
-      // we need to do it in Spark ourselves.
-      hiveTable.setFields(
-        Seq(new FieldSchema("col", "array<string>", "from deserializer")).asJava)
-    } else {
-      hiveTable.setFields(schema.asJava)
-    }
+    hiveTable.setFields(schema.asJava)
     hiveTable.setPartCols(partCols.asJava)
     userName.foreach(hiveTable.setOwner)
     hiveTable.setCreateTime((table.createTime / 1000).toInt)

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index d43534d..2e35fde 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -89,4 +89,22 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
       assert(restoredTable.schema == newSchema)
     }
   }
+
+  test("SPARK-22306: alter table schema should not erase the bucketing metadata at hive side") {
+    val catalog = newBasicCatalog()
+    externalCatalog.client.runSqlHive(
+      """
+        |CREATE TABLE db1.t(a string, b string)
+        |CLUSTERED BY (a, b) SORTED BY (a, b) INTO 10 BUCKETS
+        |STORED AS PARQUET
+      """.stripMargin)
+
+    val newSchema = new StructType().add("a", "string").add("b", "string").add("c", "string")
+    catalog.alterTableDataSchema("db1", "t", newSchema)
+
+    assert(catalog.getTable("db1", "t").schema == newSchema)
+    val bucketString = externalCatalog.client.runSqlHive("DESC FORMATTED db1.t")
+      .filter(_.contains("Num Buckets")).head
+    assert(bucketString.contains("10"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 32e97eb..c0acffb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -746,7 +746,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       val hiveTable = CatalogTable(
         identifier = TableIdentifier(tableName, Some("default")),
         tableType = CatalogTableType.MANAGED,
-        schema = new StructType,
+        schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
         provider = Some("json"),
         storage = CatalogStorageFormat(
           locationUri = None,
@@ -1271,7 +1271,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       val hiveTable = CatalogTable(
         identifier = TableIdentifier("t", Some("default")),
         tableType = CatalogTableType.MANAGED,
-        schema = new StructType,
+        schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
         provider = Some("json"),
         storage = CatalogStorageFormat.empty,
         properties = Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
index 5c248b9..bc82887 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
@@ -117,7 +117,7 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with Before
     spark.sql(createTableStmt)
     val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", tableName)
     catalog.createTable(oldTable, true)
-    catalog.alterTableSchema("default", tableName, updatedSchema)
+    catalog.alterTableDataSchema("default", tableName, updatedSchema)
 
     val updatedTable = catalog.getTable("default", tableName)
     assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org