You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/06/21 15:00:09 UTC

[hudi] branch master updated: [HUDI-1776] Support AlterCommand For Hoodie (#3086)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fd8a88  [HUDI-1776] Support AlterCommand For Hoodie (#3086)
4fd8a88 is described below

commit 4fd8a88b7ef4666f0be90eed864fe3cf1ef896ee
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Mon Jun 21 22:58:43 2021 +0800

    [HUDI-1776] Support AlterCommand For Hoodie (#3086)
---
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  17 ++-
 .../AlterHoodieTableAddColumnsCommand.scala        | 119 ++++++++++++++++++++
 .../AlterHoodieTableChangeColumnCommand.scala      |  93 ++++++++++++++++
 .../command/AlterHoodieTableRenameCommand.scala    |  55 ++++++++++
 .../command/InsertIntoHoodieTableCommand.scala     |  37 ++++---
 .../src/test/resources/sql-statements.sql          |  16 ++-
 .../org/apache/spark/sql/hudi/TestAlterTable.scala | 121 +++++++++++++++++++++
 7 files changed, 439 insertions(+), 19 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index ed77990..bf7b046 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -29,11 +29,11 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, Na
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.CreateDataSourceTableCommand
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand}
 import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
 import org.apache.spark.sql.hudi.HoodieSqlUtils
 import org.apache.spark.sql.hudi.HoodieSqlUtils._
-import org.apache.spark.sql.hudi.command.{CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand}
+import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand}
 import org.apache.spark.sql.types.StringType
 
 object HoodieAnalysis {
@@ -86,6 +86,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
       case CreateTable(table, mode, Some(query))
         if query.resolved && isHoodieTable(table) =>
           CreateHoodieTableAsSelectCommand(table, mode, query)
+
       case _=> plan
     }
   }
@@ -307,6 +308,18 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
       case CreateDataSourceTableCommand(table, ignoreIfExists)
         if isHoodieTable(table) =>
         CreateHoodieTableCommand(table, ignoreIfExists)
+      // Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand
+      case AlterTableAddColumnsCommand(tableId, colsToAdd)
+        if isHoodieTable(tableId, sparkSession) =>
+        AlterHoodieTableAddColumnsCommand(tableId, colsToAdd)
+      // Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
+      case AlterTableRenameCommand(oldName, newName, isView)
+        if !isView && isHoodieTable(oldName, sparkSession) =>
+        new AlterHoodieTableRenameCommand(oldName, newName, isView)
+      // Rewrite the AlterTableChangeColumnCommand to AlterHoodieTableChangeColumnCommand
+      case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
+        if isHoodieTable(tableName, sparkSession) =>
+        AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
       case _ => plan
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
new file mode 100644
index 0000000..b513034
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import java.nio.charset.StandardCharsets
+
+import org.apache.avro.Schema
+import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
+import org.apache.hudi.common.util.{CommitUtils, Option}
+import org.apache.hudi.table.HoodieSparkTable
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
+import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.util.SchemaUtils
+
+import scala.util.control.NonFatal
+
+/**
+ * Command for add new columns to the hudi table.
+ */
+case class AlterHoodieTableAddColumnsCommand(
+   tableId: TableIdentifier,
+   colsToAdd: Seq[StructField])
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    if (colsToAdd.nonEmpty) {
+      val table = sparkSession.sessionState.catalog.getTableMetadata(tableId)
+      // Get the new schema
+      val newSqlSchema = StructType(table.schema.fields ++ colsToAdd)
+      val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table)
+      val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
+
+      // Commit with new schema to change the table schema
+      AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)
+
+      // Refresh the new schema to meta
+      refreshSchemaInMeta(sparkSession, table, newSqlSchema)
+    }
+    Seq.empty[Row]
+  }
+
+  private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable,
+                                  newSqlSchema: StructType): Unit = {
+    try {
+      sparkSession.catalog.uncacheTable(tableId.quotedString)
+    } catch {
+      case NonFatal(e) =>
+        log.warn(s"Exception when attempting to uncache table ${tableId.quotedString}", e)
+    }
+    sparkSession.catalog.refreshTable(table.identifier.unquotedString)
+
+    SchemaUtils.checkColumnNameDuplication(
+      newSqlSchema.map(_.name),
+      "in the table definition of " + table.identifier,
+      conf.caseSensitiveAnalysis)
+    DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))
+
+    sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlSchema)
+  }
+}
+
+object AlterHoodieTableAddColumnsCommand {
+  /**
+   * Generate an empty commit with new schema to change the table's schema.
+   * @param schema The new schema to commit.
+   * @param table  The hoodie table.
+   * @param sparkSession The spark session.
+   */
+  def commitWithSchema(schema: Schema, table: CatalogTable, sparkSession: SparkSession): Unit = {
+    val path = getTableLocation(table, sparkSession)
+      .getOrElse(s"missing location for ${table.identifier}")
+
+    val jsc = new JavaSparkContext(sparkSession.sparkContext)
+    val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
+      path, table.identifier.table, HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties).asJava)
+
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
+
+    val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, metaClient.getTableType)
+    val instantTime = HoodieActiveTimeline.createNewInstantTime
+    client.startCommitWithTime(instantTime, commitActionType)
+
+    val hoodieTable = HoodieSparkTable.create(client.getConfig, client.getEngineContext)
+    val timeLine = hoodieTable.getActiveTimeline
+    val requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime)
+    val metadata = new HoodieCommitMetadata
+    metadata.setOperationType(WriteOperationType.INSERT)
+    timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString.getBytes(StandardCharsets.UTF_8)))
+
+    client.commit(instantTime, jsc.emptyRDD)
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
new file mode 100644
index 0000000..78334fd
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.avro.Schema
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.util.control.NonFatal
+
+/**
+ * Command for alter hudi table's column type.
+ */
+case class AlterHoodieTableChangeColumnCommand(
+    tableName: TableIdentifier,
+    columnName: String,
+    newColumn: StructField)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
+    val table = catalog.getTableMetadata(tableName)
+    val resolver = sparkSession.sessionState.conf.resolver
+
+    if (!resolver(columnName, newColumn.name)) {
+      throw new AnalysisException(s"Can not support change column name for hudi table currently.")
+    }
+    // Get the new schema
+    val newSqlSchema = StructType(
+      table.dataSchema.fields.map { field =>
+      if (resolver(field.name, columnName)) {
+        newColumn
+      } else {
+        field
+      }
+    })
+    val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table)
+    val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
+
+    val path = getTableLocation(table, sparkSession)
+      .getOrElse(s"missing location for ${table.identifier}")
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+      .setConf(hadoopConf).build()
+    // Validate the compatibility between new schema and origin schema.
+    validateSchema(newSchema, metaClient)
+    // Commit new schema to change the table schema
+    AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)
+
+    try {
+      sparkSession.catalog.uncacheTable(tableName.quotedString)
+    } catch {
+      case NonFatal(e) =>
+        log.warn(s"Exception when attempting to uncache table ${tableName.quotedString}", e)
+    }
+    sparkSession.catalog.refreshTable(tableName.unquotedString)
+    // Change the schema in the meta
+    catalog.alterTableDataSchema(tableName, newSqlSchema)
+
+    Seq.empty[Row]
+  }
+
+  private def validateSchema(newSchema: Schema, metaClient: HoodieTableMetaClient): Unit = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+    if (!TableSchemaResolver.isSchemaCompatible(tableSchema, newSchema)) {
+      throw new HoodieException("Failed schema compatibility check for newSchema :" + newSchema +
+        ", origin table schema :" + tableSchema + ", base path :" + metaClient.getBasePath)
+    }
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala
new file mode 100644
index 0000000..2afef51
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.AlterTableRenameCommand
+import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
+
+/**
+ * Command for alter hudi table's table name.
+ */
+class AlterHoodieTableRenameCommand(
+     oldName: TableIdentifier,
+     newName: TableIdentifier,
+     isView: Boolean)
+  extends AlterTableRenameCommand(oldName, newName, isView) {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    if (newName != oldName) {
+      val catalog = sparkSession.sessionState.catalog
+      val table = catalog.getTableMetadata(oldName)
+      val path = getTableLocation(table, sparkSession)
+        .getOrElse(s"missing location for ${table.identifier}")
+
+      val hadoopConf = sparkSession.sessionState.newHadoopConf()
+      val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+        .setConf(hadoopConf).build()
+      // Init table with new name.
+      HoodieTableMetaClient.withPropertyBuilder()
+        .fromProperties(metaClient.getTableConfig.getProperties)
+        .setTableName(newName.table)
+        .initTable(hadoopConf, path)
+      // Call AlterTableRenameCommand#run to rename table in meta.
+      super.run(sparkSession)
+    }
+    Seq.empty[Row]
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 04da522..2ad9a68 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -32,7 +32,7 @@ import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
 import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
@@ -86,11 +86,16 @@ object InsertIntoHoodieTableCommand {
       SaveMode.Append
     }
     val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config)
-    val queryData = Dataset.ofRows(sparkSession, query)
     val conf = sparkSession.sessionState.conf
-    val alignedQuery = alignOutputFields(queryData, table, insertPartitions, conf)
+    val alignedQuery = alignOutputFields(query, table, insertPartitions, conf)
+    // If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
+    // The nullable attribute of fields will lost.
+    // In order to pass the nullable attribute to the inputDF, we specify the schema
+    // of the rdd.
+    val inputDF = sparkSession.createDataFrame(
+      Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
     val success =
-      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, alignedQuery)._1
+      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, inputDF)._1
     if (success) {
       if (refreshTable) {
         sparkSession.catalog.refreshTable(table.identifier.unquotedString)
@@ -110,10 +115,10 @@ object InsertIntoHoodieTableCommand {
    * @return
    */
   private def alignOutputFields(
-    query: DataFrame,
+    query: LogicalPlan,
     table: CatalogTable,
     insertPartitions: Map[String, Option[String]],
-    conf: SQLConf): DataFrame = {
+    conf: SQLConf): LogicalPlan = {
 
     val targetPartitionSchema = table.partitionSchema
 
@@ -124,17 +129,17 @@ object InsertIntoHoodieTableCommand {
         s"is: ${staticPartitionValues.mkString("," + "")}")
 
     val queryDataFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition
-      query.logicalPlan.output.dropRight(targetPartitionSchema.fields.length)
+      query.output.dropRight(targetPartitionSchema.fields.length)
     } else { // insert static partition
-      query.logicalPlan.output
+      query.output
     }
     val targetDataSchema = table.dataSchema
     // Align for the data fields of the query
     val dataProjects = queryDataFields.zip(targetDataSchema.fields).map {
       case (dataAttr, targetField) =>
-        val castAttr = castIfNeeded(dataAttr,
+        val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable),
           targetField.dataType, conf)
-        new Column(Alias(castAttr, targetField.name)())
+        Alias(castAttr, targetField.name)()
     }
 
     val partitionProjects = if (staticPartitionValues.isEmpty) { // insert dynamic partitions
@@ -142,23 +147,23 @@ object InsertIntoHoodieTableCommand {
       // So we init the partitionAttrPosition with the data schema size.
       var partitionAttrPosition = targetDataSchema.size
       targetPartitionSchema.fields.map(f => {
-        val partitionAttr = query.logicalPlan.output(partitionAttrPosition)
+        val partitionAttr = query.output(partitionAttrPosition)
         partitionAttrPosition = partitionAttrPosition + 1
-        val castAttr = castIfNeeded(partitionAttr, f.dataType, conf)
-        new Column(Alias(castAttr, f.name)())
+        val castAttr = castIfNeeded(partitionAttr.withNullability(f.nullable), f.dataType, conf)
+        Alias(castAttr, f.name)()
       })
     } else { // insert static partitions
       targetPartitionSchema.fields.map(f => {
         val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
         s"Missing static partition value for: ${f.name}")
         val castAttr = Literal.create(staticPartitionValue, f.dataType)
-        new Column(Alias(castAttr, f.name)())
+        Alias(castAttr, f.name)()
       })
     }
     // Remove the hoodie meta fileds from the projects as we do not need these to write
-    val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.named.name))
+    val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name))
     val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects
-    query.select(alignedProjects: _*)
+    Project(alignedProjects, query)
   }
 
   /**
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
index 35dde25..280fde5 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
@@ -230,6 +230,20 @@ select id, name, price, ts, dt from h1_p order by id;
 | 6 _insert 10.0 1000 2021-05-08 |
 +--------------------------------+
 
+# ALTER TABLE
+alter table h1_p rename to h2_p;
++----------+
+| ok       |
++----------+
+alter table h2_p add columns(ext0 int);
++----------+
+| ok       |
++----------+
+alter table h2_p change column ext0 ext0 bigint;
++----------+
+| ok       |
++----------+
+
 # DROP TABLE
 drop table h0;
 +----------+
@@ -246,7 +260,7 @@ drop table h1;
 | ok       |
 +----------+
 
-drop table h1_p;
+drop table h2_p;
 +----------+
 | ok       |
 +----------+
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
new file mode 100644
index 0000000..ee73823
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+class TestAlterTable extends TestHoodieSqlBase {
+
+  test("Test Alter Table") {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        // Create table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '$tablePath'
+             | options (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+       """.stripMargin)
+        // Alter table name.
+        val newTableName = s"${tableName}_1"
+        spark.sql(s"alter table $tableName rename to $newTableName")
+        assertResult(false)(
+          spark.sessionState.catalog.tableExists(new TableIdentifier(tableName))
+        )
+        assertResult(true) (
+          spark.sessionState.catalog.tableExists(new TableIdentifier(newTableName))
+        )
+        val hadoopConf = spark.sessionState.newHadoopConf()
+        val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
+          .setConf(hadoopConf).build()
+        assertResult(newTableName) (
+          metaClient.getTableConfig.getTableName
+        )
+        spark.sql(s"insert into $newTableName values(1, 'a1', 10, 1000)")
+
+        // Add table column
+        spark.sql(s"alter table $newTableName add columns(ext0 string)")
+        val table = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName))
+        assertResult(Seq("id", "name", "price", "ts", "ext0")) {
+          HoodieSqlUtils.removeMetaFields(table.schema).fields.map(_.name)
+        }
+        checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+          Seq(1, "a1", 10.0, 1000, null)
+        )
+        // Alter table column type
+        spark.sql(s"alter table $newTableName change column id id bigint")
+        assertResult(StructType(Seq(StructField("id", LongType, nullable = true))))(
+        spark.sql(s"select id from $newTableName").schema)
+
+        // Insert data to the new table.
+        spark.sql(s"insert into $newTableName values(2, 'a2', 12, 1000, 'e0')")
+        checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+          Seq(1, "a1", 10.0, 1000, null),
+          Seq(2, "a2", 12.0, 1000, "e0")
+        )
+
+        // Merge data to the new table.
+        spark.sql(
+          s"""
+             |merge into $newTableName t0
+             |using (
+             |  select 1 as id, 'a1' as name, 12 as price, 1001 as ts, 'e0' as ext0
+             |) s0
+             |on t0.id = s0.id
+             |when matched then update set *
+             |when not matched then insert *
+           """.stripMargin)
+        checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+          Seq(1, "a1", 12.0, 1001, "e0"),
+          Seq(2, "a2", 12.0, 1000, "e0")
+        )
+
+        // Update data to the new table.
+        spark.sql(s"update $newTableName set price = 10, ext0 = null where id = 1")
+        checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+          Seq(1, "a1", 10.0, 1001, null),
+          Seq(2, "a2", 12.0, 1000, "e0")
+        )
+        spark.sql(s"update $newTableName set price = 10, ext0 = null where id = 2")
+        checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+          Seq(1, "a1", 10.0, 1001, null),
+          Seq(2, "a2", 10.0, 1000, null)
+        )
+
+        // Delete data from the new table.
+        spark.sql(s"delete from $newTableName where id = 1")
+        checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+          Seq(2, "a2", 10.0, 1000, null)
+        )
+      }
+    }
+  }
+}