You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/06/21 15:00:09 UTC
[hudi] branch master updated: [HUDI-1776] Support AlterCommand For
Hoodie (#3086)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4fd8a88 [HUDI-1776] Support AlterCommand For Hoodie (#3086)
4fd8a88 is described below
commit 4fd8a88b7ef4666f0be90eed864fe3cf1ef896ee
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Mon Jun 21 22:58:43 2021 +0800
[HUDI-1776] Support AlterCommand For Hoodie (#3086)
---
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 17 ++-
.../AlterHoodieTableAddColumnsCommand.scala | 119 ++++++++++++++++++++
.../AlterHoodieTableChangeColumnCommand.scala | 93 ++++++++++++++++
.../command/AlterHoodieTableRenameCommand.scala | 55 ++++++++++
.../command/InsertIntoHoodieTableCommand.scala | 37 ++++---
.../src/test/resources/sql-statements.sql | 16 ++-
.../org/apache/spark/sql/hudi/TestAlterTable.scala | 121 +++++++++++++++++++++
7 files changed, 439 insertions(+), 19 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index ed77990..bf7b046 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -29,11 +29,11 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, Na
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.CreateDataSourceTableCommand
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils._
-import org.apache.spark.sql.hudi.command.{CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand}
+import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand}
import org.apache.spark.sql.types.StringType
object HoodieAnalysis {
@@ -86,6 +86,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
case CreateTable(table, mode, Some(query))
if query.resolved && isHoodieTable(table) =>
CreateHoodieTableAsSelectCommand(table, mode, query)
+
case _=> plan
}
}
@@ -307,6 +308,18 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case CreateDataSourceTableCommand(table, ignoreIfExists)
if isHoodieTable(table) =>
CreateHoodieTableCommand(table, ignoreIfExists)
+ // Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand
+ case AlterTableAddColumnsCommand(tableId, colsToAdd)
+ if isHoodieTable(tableId, sparkSession) =>
+ AlterHoodieTableAddColumnsCommand(tableId, colsToAdd)
+ // Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
+ case AlterTableRenameCommand(oldName, newName, isView)
+ if !isView && isHoodieTable(oldName, sparkSession) =>
+ new AlterHoodieTableRenameCommand(oldName, newName, isView)
+ // Rewrite the AlterTableChangeColumnCommand to AlterHoodieTableChangeColumnCommand
+ case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
+ if isHoodieTable(tableName, sparkSession) =>
+ AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
case _ => plan
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
new file mode 100644
index 0000000..b513034
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import java.nio.charset.StandardCharsets
+
+import org.apache.avro.Schema
+import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
+import org.apache.hudi.common.util.{CommitUtils, Option}
+import org.apache.hudi.table.HoodieSparkTable
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
+import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.util.SchemaUtils
+
+import scala.util.control.NonFatal
+
+/**
+ * Command for add new columns to the hudi table.
+ */
+case class AlterHoodieTableAddColumnsCommand(
+ tableId: TableIdentifier,
+ colsToAdd: Seq[StructField])
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ if (colsToAdd.nonEmpty) {
+ val table = sparkSession.sessionState.catalog.getTableMetadata(tableId)
+ // Get the new schema
+ val newSqlSchema = StructType(table.schema.fields ++ colsToAdd)
+ val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table)
+ val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
+
+ // Commit with new schema to change the table schema
+ AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)
+
+ // Refresh the new schema to meta
+ refreshSchemaInMeta(sparkSession, table, newSqlSchema)
+ }
+ Seq.empty[Row]
+ }
+
+ private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable,
+ newSqlSchema: StructType): Unit = {
+ try {
+ sparkSession.catalog.uncacheTable(tableId.quotedString)
+ } catch {
+ case NonFatal(e) =>
+ log.warn(s"Exception when attempting to uncache table ${tableId.quotedString}", e)
+ }
+ sparkSession.catalog.refreshTable(table.identifier.unquotedString)
+
+ SchemaUtils.checkColumnNameDuplication(
+ newSqlSchema.map(_.name),
+ "in the table definition of " + table.identifier,
+ conf.caseSensitiveAnalysis)
+ DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))
+
+ sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlSchema)
+ }
+}
+
+object AlterHoodieTableAddColumnsCommand {
+ /**
+ * Generate an empty commit with new schema to change the table's schema.
+ * @param schema The new schema to commit.
+ * @param table The hoodie table.
+ * @param sparkSession The spark session.
+ */
+ def commitWithSchema(schema: Schema, table: CatalogTable, sparkSession: SparkSession): Unit = {
+ val path = getTableLocation(table, sparkSession)
+ .getOrElse(s"missing location for ${table.identifier}")
+
+ val jsc = new JavaSparkContext(sparkSession.sparkContext)
+ val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
+ path, table.identifier.table, HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties).asJava)
+
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
+
+ val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, metaClient.getTableType)
+ val instantTime = HoodieActiveTimeline.createNewInstantTime
+ client.startCommitWithTime(instantTime, commitActionType)
+
+ val hoodieTable = HoodieSparkTable.create(client.getConfig, client.getEngineContext)
+ val timeLine = hoodieTable.getActiveTimeline
+ val requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime)
+ val metadata = new HoodieCommitMetadata
+ metadata.setOperationType(WriteOperationType.INSERT)
+ timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString.getBytes(StandardCharsets.UTF_8)))
+
+ client.commit(instantTime, jsc.emptyRDD)
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
new file mode 100644
index 0000000..78334fd
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.avro.Schema
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.util.control.NonFatal
+
+/**
+ * Command for alter hudi table's column type.
+ */
+case class AlterHoodieTableChangeColumnCommand(
+ tableName: TableIdentifier,
+ columnName: String,
+ newColumn: StructField)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+ val table = catalog.getTableMetadata(tableName)
+ val resolver = sparkSession.sessionState.conf.resolver
+
+ if (!resolver(columnName, newColumn.name)) {
+ throw new AnalysisException(s"Can not support change column name for hudi table currently.")
+ }
+ // Get the new schema
+ val newSqlSchema = StructType(
+ table.dataSchema.fields.map { field =>
+ if (resolver(field.name, columnName)) {
+ newColumn
+ } else {
+ field
+ }
+ })
+ val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table)
+ val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
+
+ val path = getTableLocation(table, sparkSession)
+ .getOrElse(s"missing location for ${table.identifier}")
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+ .setConf(hadoopConf).build()
+ // Validate the compatibility between new schema and origin schema.
+ validateSchema(newSchema, metaClient)
+ // Commit new schema to change the table schema
+ AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)
+
+ try {
+ sparkSession.catalog.uncacheTable(tableName.quotedString)
+ } catch {
+ case NonFatal(e) =>
+ log.warn(s"Exception when attempting to uncache table ${tableName.quotedString}", e)
+ }
+ sparkSession.catalog.refreshTable(tableName.unquotedString)
+ // Change the schema in the meta
+ catalog.alterTableDataSchema(tableName, newSqlSchema)
+
+ Seq.empty[Row]
+ }
+
+ private def validateSchema(newSchema: Schema, metaClient: HoodieTableMetaClient): Unit = {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+ if (!TableSchemaResolver.isSchemaCompatible(tableSchema, newSchema)) {
+ throw new HoodieException("Failed schema compatibility check for newSchema :" + newSchema +
+ ", origin table schema :" + tableSchema + ", base path :" + metaClient.getBasePath)
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala
new file mode 100644
index 0000000..2afef51
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.AlterTableRenameCommand
+import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
+
+/**
+ * Command for alter hudi table's table name.
+ */
+class AlterHoodieTableRenameCommand(
+ oldName: TableIdentifier,
+ newName: TableIdentifier,
+ isView: Boolean)
+ extends AlterTableRenameCommand(oldName, newName, isView) {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ if (newName != oldName) {
+ val catalog = sparkSession.sessionState.catalog
+ val table = catalog.getTableMetadata(oldName)
+ val path = getTableLocation(table, sparkSession)
+ .getOrElse(s"missing location for ${table.identifier}")
+
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+ .setConf(hadoopConf).build()
+ // Init table with new name.
+ HoodieTableMetaClient.withPropertyBuilder()
+ .fromProperties(metaClient.getTableConfig.getProperties)
+ .setTableName(newName.table)
+ .initTable(hadoopConf, path)
+ // Call AlterTableRenameCommand#run to rename table in meta.
+ super.run(sparkSession)
+ }
+ Seq.empty[Row]
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 04da522..2ad9a68 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -32,7 +32,7 @@ import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
@@ -86,11 +86,16 @@ object InsertIntoHoodieTableCommand {
SaveMode.Append
}
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config)
- val queryData = Dataset.ofRows(sparkSession, query)
val conf = sparkSession.sessionState.conf
- val alignedQuery = alignOutputFields(queryData, table, insertPartitions, conf)
+ val alignedQuery = alignOutputFields(query, table, insertPartitions, conf)
+ // If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
+ // The nullable attribute of fields will lost.
+ // In order to pass the nullable attribute to the inputDF, we specify the schema
+ // of the rdd.
+ val inputDF = sparkSession.createDataFrame(
+ Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
val success =
- HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, alignedQuery)._1
+ HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, inputDF)._1
if (success) {
if (refreshTable) {
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
@@ -110,10 +115,10 @@ object InsertIntoHoodieTableCommand {
* @return
*/
private def alignOutputFields(
- query: DataFrame,
+ query: LogicalPlan,
table: CatalogTable,
insertPartitions: Map[String, Option[String]],
- conf: SQLConf): DataFrame = {
+ conf: SQLConf): LogicalPlan = {
val targetPartitionSchema = table.partitionSchema
@@ -124,17 +129,17 @@ object InsertIntoHoodieTableCommand {
s"is: ${staticPartitionValues.mkString("," + "")}")
val queryDataFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition
- query.logicalPlan.output.dropRight(targetPartitionSchema.fields.length)
+ query.output.dropRight(targetPartitionSchema.fields.length)
} else { // insert static partition
- query.logicalPlan.output
+ query.output
}
val targetDataSchema = table.dataSchema
// Align for the data fields of the query
val dataProjects = queryDataFields.zip(targetDataSchema.fields).map {
case (dataAttr, targetField) =>
- val castAttr = castIfNeeded(dataAttr,
+ val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable),
targetField.dataType, conf)
- new Column(Alias(castAttr, targetField.name)())
+ Alias(castAttr, targetField.name)()
}
val partitionProjects = if (staticPartitionValues.isEmpty) { // insert dynamic partitions
@@ -142,23 +147,23 @@ object InsertIntoHoodieTableCommand {
// So we init the partitionAttrPosition with the data schema size.
var partitionAttrPosition = targetDataSchema.size
targetPartitionSchema.fields.map(f => {
- val partitionAttr = query.logicalPlan.output(partitionAttrPosition)
+ val partitionAttr = query.output(partitionAttrPosition)
partitionAttrPosition = partitionAttrPosition + 1
- val castAttr = castIfNeeded(partitionAttr, f.dataType, conf)
- new Column(Alias(castAttr, f.name)())
+ val castAttr = castIfNeeded(partitionAttr.withNullability(f.nullable), f.dataType, conf)
+ Alias(castAttr, f.name)()
})
} else { // insert static partitions
targetPartitionSchema.fields.map(f => {
val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
s"Missing static partition value for: ${f.name}")
val castAttr = Literal.create(staticPartitionValue, f.dataType)
- new Column(Alias(castAttr, f.name)())
+ Alias(castAttr, f.name)()
})
}
// Remove the hoodie meta fileds from the projects as we do not need these to write
- val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.named.name))
+ val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name))
val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects
- query.select(alignedProjects: _*)
+ Project(alignedProjects, query)
}
/**
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
index 35dde25..280fde5 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
@@ -230,6 +230,20 @@ select id, name, price, ts, dt from h1_p order by id;
| 6 _insert 10.0 1000 2021-05-08 |
+--------------------------------+
+# ALTER TABLE
+alter table h1_p rename to h2_p;
++----------+
+| ok |
++----------+
+alter table h2_p add columns(ext0 int);
++----------+
+| ok |
++----------+
+alter table h2_p change column ext0 ext0 bigint;
++----------+
+| ok |
++----------+
+
# DROP TABLE
drop table h0;
+----------+
@@ -246,7 +260,7 @@ drop table h1;
| ok |
+----------+
-drop table h1_p;
+drop table h2_p;
+----------+
| ok |
+----------+
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
new file mode 100644
index 0000000..ee73823
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+class TestAlterTable extends TestHoodieSqlBase {
+
+ test("Test Alter Table") {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | options (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // Alter table name.
+ val newTableName = s"${tableName}_1"
+ spark.sql(s"alter table $tableName rename to $newTableName")
+ assertResult(false)(
+ spark.sessionState.catalog.tableExists(new TableIdentifier(tableName))
+ )
+ assertResult(true) (
+ spark.sessionState.catalog.tableExists(new TableIdentifier(newTableName))
+ )
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
+ .setConf(hadoopConf).build()
+ assertResult(newTableName) (
+ metaClient.getTableConfig.getTableName
+ )
+ spark.sql(s"insert into $newTableName values(1, 'a1', 10, 1000)")
+
+ // Add table column
+ spark.sql(s"alter table $newTableName add columns(ext0 string)")
+ val table = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName))
+ assertResult(Seq("id", "name", "price", "ts", "ext0")) {
+ HoodieSqlUtils.removeMetaFields(table.schema).fields.map(_.name)
+ }
+ checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+ Seq(1, "a1", 10.0, 1000, null)
+ )
+ // Alter table column type
+ spark.sql(s"alter table $newTableName change column id id bigint")
+ assertResult(StructType(Seq(StructField("id", LongType, nullable = true))))(
+ spark.sql(s"select id from $newTableName").schema)
+
+ // Insert data to the new table.
+ spark.sql(s"insert into $newTableName values(2, 'a2', 12, 1000, 'e0')")
+ checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+ Seq(1, "a1", 10.0, 1000, null),
+ Seq(2, "a2", 12.0, 1000, "e0")
+ )
+
+ // Merge data to the new table.
+ spark.sql(
+ s"""
+ |merge into $newTableName t0
+ |using (
+ | select 1 as id, 'a1' as name, 12 as price, 1001 as ts, 'e0' as ext0
+ |) s0
+ |on t0.id = s0.id
+ |when matched then update set *
+ |when not matched then insert *
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+ Seq(1, "a1", 12.0, 1001, "e0"),
+ Seq(2, "a2", 12.0, 1000, "e0")
+ )
+
+ // Update data to the new table.
+ spark.sql(s"update $newTableName set price = 10, ext0 = null where id = 1")
+ checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+ Seq(1, "a1", 10.0, 1001, null),
+ Seq(2, "a2", 12.0, 1000, "e0")
+ )
+ spark.sql(s"update $newTableName set price = 10, ext0 = null where id = 2")
+ checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+ Seq(1, "a1", 10.0, 1001, null),
+ Seq(2, "a2", 10.0, 1000, null)
+ )
+
+ // Delete data from the new table.
+ spark.sql(s"delete from $newTableName where id = 1")
+ checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
+ Seq(2, "a2", 10.0, 1000, null)
+ )
+ }
+ }
+ }
+}