You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 22:11:45 UTC
[19/49] carbondata git commit: [CARBONDATA-1656][Streaming] Reject
alter table command for streaming table
[CARBONDATA-1656][Streaming] Reject alter table command for streaming table
This closes #1448
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/87892522
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/87892522
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/87892522
Branch: refs/heads/fgdatamap
Commit: 87892522bfa73a4876e5cfe68bbf9d460a9a1f52
Parents: 4c41f86
Author: Jacky Li <ja...@qq.com>
Authored: Wed Nov 8 10:37:04 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Nov 8 16:04:04 2017 +0800
----------------------------------------------------------------------
.../schema/AlterTableAddColumnCommand.scala | 115 ------------
.../AlterTableDataTypeChangeCommand.scala | 116 -------------
.../schema/AlterTableDropColumnCommand.scala | 148 ----------------
.../schema/AlterTableRenameTableCommand.scala | 174 -------------------
.../CarbonAlterTableAddColumnCommand.scala | 115 ++++++++++++
.../CarbonAlterTableDataTypeChangeCommand.scala | 116 +++++++++++++
.../CarbonAlterTableDropColumnCommand.scala | 148 ++++++++++++++++
.../schema/CarbonAlterTableRenameCommand.scala | 174 +++++++++++++++++++
.../sql/execution/strategy/DDLStrategy.scala | 10 +-
.../strategy/StreamingTableStrategy.scala | 39 ++++-
.../sql/parser/CarbonSpark2SqlParser.scala | 8 +-
.../TestStreamingTableOperation.scala | 15 ++
.../restructure/AlterTableRevertTestCase.scala | 2 +-
13 files changed, 610 insertions(+), 570 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
deleted file mode 100644
index 6e6a4b1..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, RunnableCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
-
-private[sql] case class AlterTableAddColumnCommand(
- alterTableAddColumnsModel: AlterTableAddColumnsModel)
- extends RunnableCommand {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- val tableName = alterTableAddColumnsModel.tableName
- val dbName = alterTableAddColumnsModel.databaseName
- .getOrElse(sparkSession.catalog.currentDatabase)
- LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
- val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
- var locks = List.empty[ICarbonLock]
- var timeStamp = 0L
- var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
- var carbonTable: CarbonTable = null
- try {
- locks = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
- // Consider a concurrent scenario where 2 alter operations are executed in parallel. 1st
- // operation is success and updates the schema file. 2nd operation will get the lock after
- // completion of 1st operation but as look up relation is called before it will have the
- // older carbon table and this can lead to inconsistent state in the system. Therefor look
- // up relation should be called after acquiring the lock
- val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
- // get the latest carbon table and check for column existence
- // read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
- val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
- val schemaConverter = new ThriftWrapperSchemaConverterImpl()
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(thriftTableInfo,
- dbName,
- tableName,
- carbonTable.getStorePath)
- newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
- dbName,
- wrapperTableInfo,
- carbonTablePath,
- carbonTable.getCarbonTableIdentifier,
- carbonTable.getStorePath, sparkSession.sparkContext).process
- // generate dictionary files for the newly added columns
- new AlterTableAddColumnRDD(sparkSession.sparkContext,
- newCols,
- carbonTable.getCarbonTableIdentifier,
- carbonTable.getStorePath).collect()
- timeStamp = System.currentTimeMillis
- val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
- schemaEvolutionEntry.setTimeStamp(timeStamp)
- schemaEvolutionEntry.setAdded(newCols.toList.asJava)
- val thriftTable = schemaConverter
- .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
- AlterTableUtil
- .updateSchemaInfo(carbonTable,
- schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
- thriftTable)(sparkSession,
- sparkSession.sessionState.asInstanceOf[CarbonSessionState])
- LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
- } catch {
- case e: Exception =>
- LOGGER.error(e, "Alter table add columns failed")
- if (newCols.nonEmpty) {
- LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
- new AlterTableDropColumnRDD(sparkSession.sparkContext,
- newCols,
- carbonTable.getCarbonTableIdentifier,
- carbonTable.getStorePath).collect()
- AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
- }
- sys.error(s"Alter table add operation failed: ${e.getMessage}")
- } finally {
- // release lock after command execution completion
- AlterTableUtil.releaseLocks(locks)
- }
- Seq.empty
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
deleted file mode 100644
index be87bbb..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, RunnableCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
-
-private[sql] case class AlterTableDataTypeChangeCommand(
- alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
- extends RunnableCommand {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- val tableName = alterTableDataTypeChangeModel.tableName
- val dbName = alterTableDataTypeChangeModel.databaseName
- .getOrElse(sparkSession.catalog.currentDatabase)
- LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
- val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
- var locks = List.empty[ICarbonLock]
- // get the latest carbon table and check for column existence
- var carbonTable: CarbonTable = null
- var timeStamp = 0L
- try {
- locks = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
- val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
- val columnName = alterTableDataTypeChangeModel.columnName
- val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
- if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
- LOGGER.audit(s"Alter table change data type request has failed. " +
- s"Column $columnName does not exist")
- sys.error(s"Column does not exist: $columnName")
- }
- val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
- if (carbonColumn.size == 1) {
- CarbonScalaUtil
- .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
- } else {
- LOGGER.audit(s"Alter table change data type request has failed. " +
- s"Column $columnName is invalid")
- sys.error(s"Invalid Column: $columnName")
- }
- // read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
- val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
- // maintain the added column for schema evolution history
- var addColumnSchema: ColumnSchema = null
- var deletedColumnSchema: ColumnSchema = null
- val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
- columnSchemaList.foreach { columnSchema =>
- if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
- deletedColumnSchema = columnSchema.deepCopy
- columnSchema.setData_type(DataTypeConverterUtil
- .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
- columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
- columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
- addColumnSchema = columnSchema
- }
- }
- timeStamp = System.currentTimeMillis
- val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
- schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
- schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
- tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
- .setTime_stamp(System.currentTimeMillis)
- AlterTableUtil
- .updateSchemaInfo(carbonTable,
- schemaEvolutionEntry,
- tableInfo)(sparkSession,
- sparkSession.sessionState.asInstanceOf[CarbonSessionState])
- LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
- } catch {
- case e: Exception => LOGGER
- .error("Alter table change datatype failed : " + e.getMessage)
- if (carbonTable != null) {
- AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
- }
- sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
- } finally {
- // release lock after command execution completion
- AlterTableUtil.releaseLocks(locks)
- }
- Seq.empty
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
deleted file mode 100644
index 2f1e3d9..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, RunnableCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
-
-private[sql] case class AlterTableDropColumnCommand(
- alterTableDropColumnModel: AlterTableDropColumnModel)
- extends RunnableCommand {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- val tableName = alterTableDropColumnModel.tableName
- val dbName = alterTableDropColumnModel.databaseName
- .getOrElse(sparkSession.catalog.currentDatabase)
- LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
- var locks = List.empty[ICarbonLock]
- var timeStamp = 0L
- val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
- // get the latest carbon table and check for column existence
- var carbonTable: CarbonTable = null
- try {
- locks = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
- val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- carbonTable = metastore
- .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
- val partitionInfo = carbonTable.getPartitionInfo(tableName)
- if (partitionInfo != null) {
- val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
- .map(_.getColumnName)
- // check each column existence in the table
- val partitionColumns = alterTableDropColumnModel.columns.filter {
- tableColumn => partitionColumnSchemaList.contains(tableColumn)
- }
- if (partitionColumns.nonEmpty) {
- throw new UnsupportedOperationException("Partition columns cannot be dropped: " +
- s"$partitionColumns")
- }
- }
- val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
- var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
- .ColumnSchema]()
- var keyColumnCountToBeDeleted = 0
- // TODO: if deleted column list includes bucketted column throw an error
- alterTableDropColumnModel.columns.foreach { column =>
- var columnExist = false
- tableColumns.foreach { tableColumn =>
- // column should not be already deleted and should exist in the table
- if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
- if (tableColumn.isDimension) {
- keyColumnCountToBeDeleted += 1
- if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
- dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
- }
- }
- columnExist = true
- }
- }
- if (!columnExist) {
- sys.error(s"Column $column does not exists in the table $dbName.$tableName")
- }
- }
- // take the total key column count. key column to be deleted should not
- // be >= key columns in schema
- val totalKeyColumnInSchema = tableColumns.count {
- tableColumn => !tableColumn.isInvisible && tableColumn.isDimension
- }
- if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
- sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
- }
- // read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
- val tableInfo: org.apache.carbondata.format.TableInfo =
- metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
- // maintain the deleted columns for schema evolution history
- var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
- val columnSchemaList = tableInfo.fact_table.table_columns.asScala
- alterTableDropColumnModel.columns.foreach { column =>
- columnSchemaList.foreach { columnSchema =>
- if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
- deletedColumnSchema += columnSchema.deepCopy
- columnSchema.invisible = true
- }
- }
- }
- // add deleted columns to schema evolution history and update the schema
- timeStamp = System.currentTimeMillis
- val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
- schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
- AlterTableUtil
- .updateSchemaInfo(carbonTable,
- schemaEvolutionEntry,
- tableInfo)(sparkSession,
- sparkSession.sessionState.asInstanceOf[CarbonSessionState])
- // TODO: 1. add check for deletion of index tables
- // delete dictionary files for dictionary column and clear dictionary cache from memory
- new AlterTableDropColumnRDD(sparkSession.sparkContext,
- dictionaryColumns,
- carbonTable.getCarbonTableIdentifier,
- carbonTable.getStorePath).collect()
- LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
- } catch {
- case e: Exception => LOGGER
- .error("Alter table drop columns failed : " + e.getMessage)
- if (carbonTable != null) {
- AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
- }
- sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
- } finally {
- // release lock after command execution completion
- AlterTableUtil.releaseLocks(locks)
- }
- Seq.empty
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
deleted file mode 100644
index af361d5..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableRenameModel, RunnableCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-private[sql] case class AlterTableRenameTableCommand(
- alterTableRenameModel: AlterTableRenameModel)
- extends RunnableCommand {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
- val newTableIdentifier = alterTableRenameModel.newTableIdentifier
- val oldDatabaseName = oldTableIdentifier.database
- .getOrElse(sparkSession.catalog.currentDatabase)
- val newDatabaseName = newTableIdentifier.database
- .getOrElse(sparkSession.catalog.currentDatabase)
- if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
- throw new MalformedCarbonCommandException("Database name should be same for both tables")
- }
- val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
- if (tableExists) {
- throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
- s"already exists")
- }
- val oldTableName = oldTableIdentifier.table.toLowerCase
- val newTableName = newTableIdentifier.table.toLowerCase
- LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
- LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
- val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relation: CarbonRelation =
- metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER.audit(s"Rename table request has failed. " +
- s"Table $oldDatabaseName.$oldTableName does not exist")
- sys.error(s"Table $oldDatabaseName.$oldTableName does not exist")
- }
- val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
- LockUsage.COMPACTION_LOCK,
- LockUsage.DELETE_SEGMENT_LOCK,
- LockUsage.CLEAN_FILES_LOCK,
- LockUsage.DROP_TABLE_LOCK)
- var locks = List.empty[ICarbonLock]
- var timeStamp = 0L
- var carbonTable: CarbonTable = null
- try {
- locks = AlterTableUtil
- .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
- sparkSession)
- val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
- .asInstanceOf[CarbonRelation].tableMeta
- carbonTable = tableMeta.carbonTable
- // get the latest carbon table and check for column existence
- val carbonTablePath = CarbonStorePath.
- getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
- val tableMetadataFile = carbonTablePath.getPath
- val tableInfo: org.apache.carbondata.format.TableInfo =
- metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
- val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
- schemaEvolutionEntry.setTableName(newTableName)
- timeStamp = System.currentTimeMillis()
- schemaEvolutionEntry.setTime_stamp(timeStamp)
- renameBadRecords(oldTableName, newTableName, oldDatabaseName)
- val fileType = FileFactory.getFileType(tableMetadataFile)
- if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
- .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
- newTableName)
- if (!rename) {
- renameBadRecords(newTableName, oldTableName, oldDatabaseName)
- sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
- }
- }
- val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
- newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
- val newTablePath = metastore.updateTableSchema(newTableIdentifier,
- carbonTable.getCarbonTableIdentifier,
- tableInfo,
- schemaEvolutionEntry,
- tableMeta.tablePath)(sparkSession)
- metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
- sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
- .runSqlHive(
- s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
- sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
- .runSqlHive(
- s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
- s"('tableName'='$newTableName', " +
- s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
- sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
- Some(oldDatabaseName)).quotedString)
- LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
- LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
- } catch {
- case e: Exception =>
- LOGGER.error(e, "Rename table failed: " + e.getMessage)
- if (carbonTable != null) {
- AlterTableUtil
- .revertRenameTableChanges(oldTableIdentifier,
- newTableName,
- carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier.getTableId,
- timeStamp)(
- sparkSession)
- renameBadRecords(newTableName, oldTableName, oldDatabaseName)
- }
- sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
- } finally {
- // release lock after command execution completion
- AlterTableUtil.releaseLocks(locks)
- // case specific to rename table as after table rename old table path will not be found
- if (carbonTable != null) {
- AlterTableUtil
- .releaseLocksManually(locks,
- locksToBeAcquired,
- oldDatabaseName,
- newTableName,
- carbonTable.getStorePath)
- }
- }
- Seq.empty
- }
-
- private def renameBadRecords(
- oldTableName: String,
- newTableName: String,
- dataBaseName: String): Unit = {
- val oldPath = CarbonUtil
- .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName)
- val newPath = CarbonUtil
- .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName)
- val fileType = FileFactory.getFileType(oldPath)
- if (FileFactory.isFileExist(oldPath, fileType)) {
- val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType)
- .renameForce(newPath)
- if (!renameSuccess) {
- sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName")
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
new file mode 100644
index 0000000..8737464
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
+
+private[sql] case class CarbonAlterTableAddColumnCommand(
+ alterTableAddColumnsModel: AlterTableAddColumnsModel)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val tableName = alterTableAddColumnsModel.tableName
+ val dbName = alterTableAddColumnsModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+ var locks = List.empty[ICarbonLock]
+ var timeStamp = 0L
+ var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
+ var carbonTable: CarbonTable = null
+ try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+ // Consider a concurrent scenario where 2 alter operations are executed in parallel. 1st
+ // operation is success and updates the schema file. 2nd operation will get the lock after
+ // completion of 1st operation but as look up relation is called before it will have the
+ // older carbon table and this can lead to inconsistent state in the system. Therefor look
+ // up relation should be called after acquiring the lock
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ carbonTable = metastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ .tableMeta.carbonTable
+ // get the latest carbon table and check for column existence
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(thriftTableInfo,
+ dbName,
+ tableName,
+ carbonTable.getStorePath)
+ newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
+ dbName,
+ wrapperTableInfo,
+ carbonTablePath,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath, sparkSession.sparkContext).process
+ // generate dictionary files for the newly added columns
+ new AlterTableAddColumnRDD(sparkSession.sparkContext,
+ newCols,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath).collect()
+ timeStamp = System.currentTimeMillis
+ val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+ schemaEvolutionEntry.setTimeStamp(timeStamp)
+ schemaEvolutionEntry.setAdded(newCols.toList.asJava)
+ val thriftTable = schemaConverter
+ .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable,
+ schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
+ thriftTable)(sparkSession,
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+ LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, "Alter table add columns failed")
+ if (newCols.nonEmpty) {
+ LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
+ new AlterTableDropColumnRDD(sparkSession.sparkContext,
+ newCols,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath).collect()
+ AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
+ }
+ sys.error(s"Alter table add operation failed: ${e.getMessage}")
+ } finally {
+ // release lock after command execution completion
+ AlterTableUtil.releaseLocks(locks)
+ }
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
new file mode 100644
index 0000000..4e180c8
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
+
+private[sql] case class CarbonAlterTableDataTypeChangeCommand(
+ alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val tableName = alterTableDataTypeChangeModel.tableName
+ val dbName = alterTableDataTypeChangeModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+ var locks = List.empty[ICarbonLock]
+ // get the latest carbon table and check for column existence
+ var carbonTable: CarbonTable = null
+ var timeStamp = 0L
+ try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ carbonTable = metastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ .tableMeta.carbonTable
+ val columnName = alterTableDataTypeChangeModel.columnName
+ val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
+ if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
+ LOGGER.audit(s"Alter table change data type request has failed. " +
+ s"Column $columnName does not exist")
+ sys.error(s"Column does not exist: $columnName")
+ }
+ val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
+ if (carbonColumn.size == 1) {
+ CarbonScalaUtil
+ .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
+ } else {
+ LOGGER.audit(s"Alter table change data type request has failed. " +
+ s"Column $columnName is invalid")
+ sys.error(s"Invalid Column: $columnName")
+ }
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ // maintain the added column for schema evolution history
+ var addColumnSchema: ColumnSchema = null
+ var deletedColumnSchema: ColumnSchema = null
+ val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
+ columnSchemaList.foreach { columnSchema =>
+ if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
+ deletedColumnSchema = columnSchema.deepCopy
+ columnSchema.setData_type(DataTypeConverterUtil
+ .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
+ columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
+ columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
+ addColumnSchema = columnSchema
+ }
+ }
+ timeStamp = System.currentTimeMillis
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+ schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
+ schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
+ tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable,
+ schemaEvolutionEntry,
+ tableInfo)(sparkSession,
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+ LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception => LOGGER
+ .error("Alter table change datatype failed : " + e.getMessage)
+ if (carbonTable != null) {
+ AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
+ }
+ sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
+ } finally {
+ // release lock after command execution completion
+ AlterTableUtil.releaseLocks(locks)
+ }
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
new file mode 100644
index 0000000..3ac23f7
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.SchemaEvolutionEntry
+import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
+
+private[sql] case class CarbonAlterTableDropColumnCommand(
+ alterTableDropColumnModel: AlterTableDropColumnModel)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val tableName = alterTableDropColumnModel.tableName
+ val dbName = alterTableDropColumnModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
+ var locks = List.empty[ICarbonLock]
+ var timeStamp = 0L
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+ // get the latest carbon table and check for column existence
+ var carbonTable: CarbonTable = null
+ try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ carbonTable = metastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ .tableMeta.carbonTable
+ val partitionInfo = carbonTable.getPartitionInfo(tableName)
+ if (partitionInfo != null) {
+ val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
+ .map(_.getColumnName)
+ // check each column existence in the table
+ val partitionColumns = alterTableDropColumnModel.columns.filter {
+ tableColumn => partitionColumnSchemaList.contains(tableColumn)
+ }
+ if (partitionColumns.nonEmpty) {
+ throw new UnsupportedOperationException("Partition columns cannot be dropped: " +
+ s"$partitionColumns")
+ }
+ }
+ val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
+ var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
+ .ColumnSchema]()
+ var keyColumnCountToBeDeleted = 0
+ // TODO: if deleted column list includes bucketted column throw an error
+ alterTableDropColumnModel.columns.foreach { column =>
+ var columnExist = false
+ tableColumns.foreach { tableColumn =>
+ // column should not be already deleted and should exist in the table
+ if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
+ if (tableColumn.isDimension) {
+ keyColumnCountToBeDeleted += 1
+ if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
+ dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
+ }
+ }
+ columnExist = true
+ }
+ }
+ if (!columnExist) {
+ sys.error(s"Column $column does not exists in the table $dbName.$tableName")
+ }
+ }
+ // take the total key column count. key column to be deleted should not
+ // be >= key columns in schema
+ val totalKeyColumnInSchema = tableColumns.count {
+ tableColumn => !tableColumn.isInvisible && tableColumn.isDimension
+ }
+ if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
+ sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
+ }
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableInfo: org.apache.carbondata.format.TableInfo =
+ metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ // maintain the deleted columns for schema evolution history
+ var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
+ val columnSchemaList = tableInfo.fact_table.table_columns.asScala
+ alterTableDropColumnModel.columns.foreach { column =>
+ columnSchemaList.foreach { columnSchema =>
+ if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
+ deletedColumnSchema += columnSchema.deepCopy
+ columnSchema.invisible = true
+ }
+ }
+ }
+ // add deleted columns to schema evolution history and update the schema
+ timeStamp = System.currentTimeMillis
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+ schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable,
+ schemaEvolutionEntry,
+ tableInfo)(sparkSession,
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+ // TODO: 1. add check for deletion of index tables
+ // delete dictionary files for dictionary column and clear dictionary cache from memory
+ new AlterTableDropColumnRDD(sparkSession.sparkContext,
+ dictionaryColumns,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath).collect()
+ LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception => LOGGER
+ .error("Alter table drop columns failed : " + e.getMessage)
+ if (carbonTable != null) {
+ AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
+ }
+ sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
+ } finally {
+ // release lock after command execution completion
+ AlterTableUtil.releaseLocks(locks)
+ }
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
new file mode 100644
index 0000000..88cf212
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.{AlterTableRenameModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.SchemaEvolutionEntry
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+private[sql] case class CarbonAlterTableRenameCommand(
+ alterTableRenameModel: AlterTableRenameModel)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
+ val newTableIdentifier = alterTableRenameModel.newTableIdentifier
+ val oldDatabaseName = oldTableIdentifier.database
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ val newDatabaseName = newTableIdentifier.database
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
+ throw new MalformedCarbonCommandException("Database name should be same for both tables")
+ }
+ val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
+ if (tableExists) {
+ throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
+ s"already exists")
+ }
+ val oldTableName = oldTableIdentifier.table.toLowerCase
+ val newTableName = newTableIdentifier.table.toLowerCase
+ LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+ LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val relation: CarbonRelation =
+ metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ LOGGER.audit(s"Rename table request has failed. " +
+ s"Table $oldDatabaseName.$oldTableName does not exist")
+ sys.error(s"Table $oldDatabaseName.$oldTableName does not exist")
+ }
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+ LockUsage.COMPACTION_LOCK,
+ LockUsage.DELETE_SEGMENT_LOCK,
+ LockUsage.CLEAN_FILES_LOCK,
+ LockUsage.DROP_TABLE_LOCK)
+ var locks = List.empty[ICarbonLock]
+ var timeStamp = 0L
+ var carbonTable: CarbonTable = null
+ try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
+ sparkSession)
+ val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation].tableMeta
+ carbonTable = tableMeta.carbonTable
+ // get the latest carbon table and check for column existence
+ val carbonTablePath = CarbonStorePath.
+ getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
+ val tableMetadataFile = carbonTablePath.getPath
+ val tableInfo: org.apache.carbondata.format.TableInfo =
+ metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+ schemaEvolutionEntry.setTableName(newTableName)
+ timeStamp = System.currentTimeMillis()
+ schemaEvolutionEntry.setTime_stamp(timeStamp)
+ renameBadRecords(oldTableName, newTableName, oldDatabaseName)
+ val fileType = FileFactory.getFileType(tableMetadataFile)
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
+ .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+ newTableName)
+ if (!rename) {
+ renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+ sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
+ }
+ }
+ val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
+ newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
+ val newTablePath = metastore.updateTableSchema(newTableIdentifier,
+ carbonTable.getCarbonTableIdentifier,
+ tableInfo,
+ schemaEvolutionEntry,
+ tableMeta.tablePath)(sparkSession)
+ metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+ .runSqlHive(
+ s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+ .runSqlHive(
+ s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
+ s"('tableName'='$newTableName', " +
+ s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+ sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
+ Some(oldDatabaseName)).quotedString)
+ LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
+ LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, "Rename table failed: " + e.getMessage)
+ if (carbonTable != null) {
+ AlterTableUtil
+ .revertRenameTableChanges(oldTableIdentifier,
+ newTableName,
+ carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier.getTableId,
+ timeStamp)(
+ sparkSession)
+ renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+ }
+ sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
+ } finally {
+ // release lock after command execution completion
+ AlterTableUtil.releaseLocks(locks)
+ // case specific to rename table as after table rename old table path will not be found
+ if (carbonTable != null) {
+ AlterTableUtil
+ .releaseLocksManually(locks,
+ locksToBeAcquired,
+ oldDatabaseName,
+ newTableName,
+ carbonTable.getStorePath)
+ }
+ }
+ Seq.empty
+ }
+
+ private def renameBadRecords(
+ oldTableName: String,
+ newTableName: String,
+ dataBaseName: String): Unit = {
+ val oldPath = CarbonUtil
+ .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName)
+ val newPath = CarbonUtil
+ .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName)
+ val fileType = FileFactory.getFileType(oldPath)
+ if (FileFactory.isFileExist(oldPath, fileType)) {
+ val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType)
+ .renameForce(newPath)
+ if (!renameSuccess) {
+ sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName")
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index bdfaa5a..bf13e41 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CarbonShowLoadsCommand, LoadTableByInsertCommand, LoadTableCommand}
import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand
-import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand, CarbonAlterTableRenameCommand}
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
import org.apache.carbondata.core.util.CarbonUtil
@@ -56,7 +56,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
sparkSession)
if (isCarbonTable) {
val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
- ExecutedCommandExec(AlterTableRenameTableCommand(renameModel)) :: Nil
+ ExecutedCommandExec(CarbonAlterTableRenameCommand(renameModel)) :: Nil
} else {
ExecutedCommandExec(alter) :: Nil
}
@@ -98,7 +98,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
throw new MalformedCarbonCommandException(
"Operation not allowed : " + altertablemodel.alterSql)
}
- case dataTypeChange@AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
+ case dataTypeChange@CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
alterTableChangeDataTypeModel.databaseName))(sparkSession)
@@ -107,7 +107,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
- case addColumn@AlterTableAddColumnCommand(alterTableAddColumnsModel) =>
+ case addColumn@CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
alterTableAddColumnsModel.databaseName))(sparkSession)
@@ -116,7 +116,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
- case dropColumn@AlterTableDropColumnCommand(alterTableDropColumnModel) =>
+ case dropColumn@CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) =>
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
alterTableDropColumnModel.databaseName))(sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 0f0bc24..9ebf47e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -21,8 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.command.{AlterTableRenameCommand, ExecutedCommandExec}
import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand, ProjectForUpdateCommand}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -34,12 +35,36 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
- case update@ProjectForUpdateCommand(_, tableIdentifier) =>
- rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Data update")
- ExecutedCommandExec(update) :: Nil
- case delete@ProjectForDeleteCommand(_, tableIdentifier, _) =>
- rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Date delete")
- ExecutedCommandExec(delete) :: Nil
+ case ProjectForUpdateCommand(_, tableIdentifier) =>
+ rejectIfStreamingTable(
+ DeleteExecution.getTableIdentifier(tableIdentifier),
+ "Data update")
+ Nil
+ case ProjectForDeleteCommand(_, tableIdentifier, _) =>
+ rejectIfStreamingTable(
+ DeleteExecution.getTableIdentifier(tableIdentifier),
+ "Date delete")
+ Nil
+ case CarbonAlterTableAddColumnCommand(model) =>
+ rejectIfStreamingTable(
+ new TableIdentifier(model.tableName, model.databaseName),
+ "Alter table add column")
+ Nil
+ case CarbonAlterTableDropColumnCommand(model) =>
+ rejectIfStreamingTable(
+ new TableIdentifier(model.tableName, model.databaseName),
+ "Alter table drop column")
+ Nil
+ case CarbonAlterTableDataTypeChangeCommand(model) =>
+ rejectIfStreamingTable(
+ new TableIdentifier(model.tableName, model.databaseName),
+ "Alter table change datatype")
+ Nil
+ case AlterTableRenameCommand(oldTableIdentifier, _, _) =>
+ rejectIfStreamingTable(
+ oldTableIdentifier,
+ "Alter rename table")
+ Nil
case _ => Nil
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 9c87b8b..fc2ed41 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand}
import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
-import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.types.StructField
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -326,7 +326,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
table.toLowerCase,
columnName.toLowerCase,
columnNameCopy.toLowerCase)
- AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+ CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
}
protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
@@ -395,7 +395,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
tableModel.dimCols,
tableModel.msrCols,
tableModel.highcardinalitydims.getOrElse(Seq.empty))
- AlterTableAddColumnCommand(alterTableAddColumnsModel)
+ CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
}
private def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = {
@@ -419,7 +419,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
val alterTableDropColumnModel = AlterTableDropColumnModel(convertDbNameToLowerCase(dbName),
table.toLowerCase,
values.map(_.toLowerCase))
- AlterTableDropColumnCommand(alterTableDropColumnModel)
+ CarbonAlterTableDropColumnCommand(alterTableDropColumnModel)
}
def getFields(schema: Seq[StructField]): Seq[Field] = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index b733d4f..d5f9426 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -80,6 +80,21 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}
}
+ test("test blocking alter table operation on streaming table") {
+ intercept[MalformedCarbonCommandException] {
+ sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").show()
+ }
+ intercept[MalformedCarbonCommandException] {
+ sql("""ALTER TABLE source DROP COLUMNS (c1)""").show()
+ }
+ intercept[MalformedCarbonCommandException] {
+ sql("""ALTER TABLE source RENAME to t""").show()
+ }
+ intercept[MalformedCarbonCommandException] {
+ sql("""ALTER TABLE source CHANGE c1 c1 int""").show()
+ }
+ }
+
override def afterAll {
sql("USE default")
sql("DROP DATABASE IF EXISTS streaming CASCADE")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 29de05b..00170e2 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -51,7 +51,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
}
test("test to revert table name on failure") {
- intercept[RuntimeException] {
+ val exception = intercept[RuntimeException] {
new File(TestQueryExecutor.warehouse + "/reverttest_fail").mkdir()
sql("alter table reverttest rename to reverttest_fail")
new File(TestQueryExecutor.warehouse + "/reverttest_fail").delete()