You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/04/07 09:55:43 UTC
[40/49] incubator-carbondata git commit: added support to revert
changes if query fails
added support to revert changes if query fails
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2a4f09b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2a4f09b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2a4f09b7
Branch: refs/heads/12-dev
Commit: 2a4f09b7ab125581c7caa2bf57513abc07ac3c7f
Parents: bbade2a
Author: kunal642 <ku...@knoldus.in>
Authored: Wed Mar 15 14:38:31 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 6 17:58:39 2017 +0530
----------------------------------------------------------------------
.../ThriftWrapperSchemaConverterImpl.java | 4 +-
.../carbondata/core/util/DataTypeUtil.java | 16 +-
.../execution/command/carbonTableSchema.scala | 2 +-
.../execution/command/AlterTableCommands.scala | 48 +++---
.../apache/spark/sql/hive/CarbonMetastore.scala | 30 +++-
.../org/apache/spark/util/AlterTableUtil.scala | 163 ++++++++++++++++++-
.../restructure/AlterTableRevertTestCase.scala | 69 ++++++++
.../spark/sql/common/util/QueryTest.scala | 3 +
8 files changed, 303 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 09ed368..974cc81 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -404,8 +404,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
org.apache.carbondata.format.TableInfo externalTableInfo, String dbName, String tableName,
String storePath) {
TableInfo wrapperTableInfo = new TableInfo();
+ List<org.apache.carbondata.format.SchemaEvolutionEntry> schemaEvolutionList =
+ externalTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history();
wrapperTableInfo.setLastUpdatedTime(
- externalTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history().get(0)
+ schemaEvolutionList.get(schemaEvolutionList.size() - 1)
.getTime_stamp());
wrapperTableInfo.setDatabaseName(dbName);
wrapperTableInfo.setTableUniqueName(dbName + "_" + tableName);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index e437405..76df425 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -573,10 +573,10 @@ public final class DataTypeUtil {
* Below method will be used to convert the data into byte[]
*
* @param data
- * @param actualDataType actual data type
+ * @param ColumnSchema
* @return actual data in byte[]
*/
- public static byte[] convertDataToBytesBasedOnDataType(String data, DataType actualDataType) {
+ public static byte[] convertDataToBytesBasedOnDataType(String data, ColumnSchema columnSchema) {
if (null == data) {
return null;
} else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(data)) {
@@ -585,7 +585,7 @@ public final class DataTypeUtil {
}
try {
long parsedIntVal = 0;
- switch (actualDataType) {
+ switch (columnSchema.getDataType()) {
case INT:
parsedIntVal = (long) Integer.parseInt(data);
return String.valueOf(parsedIntVal)
@@ -602,13 +602,17 @@ public final class DataTypeUtil {
.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
case DATE:
case TIMESTAMP:
- DirectDictionaryGenerator directDictionaryGenerator =
- DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(actualDataType);
+ DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(columnSchema.getDataType());
int value = directDictionaryGenerator.generateDirectSurrogateKey(data);
return String.valueOf(value)
.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
case DECIMAL:
- java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data);
+ String parsedValue = parseStringToBigDecimal(data, columnSchema);
+ if (null == parsedValue) {
+ return null;
+ }
+ java.math.BigDecimal javaDecVal = new java.math.BigDecimal(parsedValue);
return bigDecimalToByte(javaDecVal);
default:
return UTF8String.fromString(data).getBytes();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 367bf46..117b365 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -239,7 +239,7 @@ class AlterTableProcessor(
if (elem._1.toLowerCase.startsWith(defaultValueString)) {
if (col.getColumnName.equalsIgnoreCase(elem._1.substring(defaultValueString.length))) {
rawData = elem._2
- val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col.getDataType)
+ val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col)
if (null != data) {
col.setDefaultValue(data)
} else {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 38fdb11..0be0bdf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -56,6 +57,7 @@ private[sql] case class AlterTableAddColumns(
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
+ val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
try {
// read the latest schema file
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
@@ -90,9 +92,8 @@ private[sql] case class AlterTableAddColumns(
LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
} catch {
- case e: Exception =>
- LOGGER.error(e, s"Alter table add columns failed : ${e.getMessage}")
- // clean up the dictionary files in case of any failure
+ case e: Exception => LOGGER
+ .error("Alter table add columns failed :" + e.getMessage)
if (!newCols.isEmpty) {
LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
new AlterTableDropColumnRDD(sparkSession.sparkContext,
@@ -100,6 +101,7 @@ private[sql] case class AlterTableAddColumns(
carbonTable.getCarbonTableIdentifier,
carbonTable.getStorePath).collect()
}
+ AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
sys.error("Alter table add column operation failed. Please check the logs")
} finally {
// release lock after command execution completion
@@ -151,6 +153,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
.validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)(
sparkSession)
val carbonTable = relation.tableMeta.carbonTable
+ val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
try {
// get the latest carbon table and check for column existence
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
@@ -160,6 +163,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
.readSchemaFile(tableMetadataFile)
val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
schemaEvolutionEntry.setTableName(newTableName)
+ schemaEvolutionEntry.setTime_stamp(System.currentTimeMillis())
renameBadRecords(oldTableName, newTableName, oldDatabaseName)
val fileType = FileFactory.getFileType(tableMetadataFile)
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
@@ -167,8 +171,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
.renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
newTableName)
if (!rename) {
- sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+ sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
}
}
val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
@@ -190,8 +194,11 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
} catch {
- case e: Exception =>
- LOGGER.error(e, s"Rename table failed: ${e.getMessage}")
+ case e: Exception => LOGGER
+ .error("Rename table failed: " + e.getMessage)
+ AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, newTableName, lastUpdatedTime)(
+ sparkSession)
+ renameBadRecords(newTableName, oldTableName, oldDatabaseName)
sys.error("Alter table rename table operation failed. Please check the logs")
} finally {
// release lock after command execution completion
@@ -240,9 +247,10 @@ private[sql] case class AlterTableDropColumns(
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
val locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
+ // get the latest carbon table and check for column existence
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
try {
- // get the latest carbon table and check for column existence
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
// check each column existence in the table
val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
@@ -269,12 +277,8 @@ private[sql] case class AlterTableDropColumns(
}
// take the total key column count. key column to be deleted should not
// be >= key columns in schema
- var totalKeyColumnInSchema = 0
- tableColumns.foreach { tableColumn =>
- // column should not be already deleted and should exist in the table
- if (!tableColumn.isInvisible && tableColumn.isDimesion) {
- totalKeyColumnInSchema += 1
- }
+ val totalKeyColumnInSchema = tableColumns.count {
+ tableColumn => !tableColumn.isInvisible && tableColumn.isDimesion
}
if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
@@ -315,8 +319,9 @@ private[sql] case class AlterTableDropColumns(
LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
} catch {
- case e: Exception =>
- LOGGER.error(e, s"Alter table drop columns failed : ${e.getMessage}")
+ case e: Exception => LOGGER
+ .error("Alter table drop columns failed : " + e.getMessage)
+ AlterTableUtil.revertDropColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
sys.error("Alter table drop column operation failed. Please check the logs")
} finally {
// release lock after command execution completion
@@ -339,11 +344,11 @@ private[sql] case class AlterTableDataTypeChange(
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
val locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
+ // get the latest carbon table and check for column existence
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
try {
- // get the latest carbon table and check for column existence
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
val columnName = alterTableDataTypeChangeModel.columnName
- var carbonColumnToBeModified: CarbonColumn = null
val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
@@ -393,8 +398,9 @@ private[sql] case class AlterTableDataTypeChange(
LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
} catch {
- case e: Exception =>
- LOGGER.error(e, s"Alter table change datatype failed : ${e.getMessage}")
+ case e: Exception => LOGGER
+ .error("Alter table change datatype failed : " + e.getMessage)
+ AlterTableUtil.revertDataTypeChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
sys.error("Alter table data type change operation failed. Please check the logs")
} finally {
// release lock after command execution completion
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 6460490..6f74960 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -301,7 +301,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
}
/**
- * This method will overwrite the existing schema and update it with the gievn details
+ * This method will overwrite the existing schema and update it with the given details
*
* @param carbonTableIdentifier
* @param thriftTableInfo
@@ -328,6 +328,34 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
}
/**
+ * This method will is used to remove the evolution entry in case of failure.
+ *
+ * @param carbonTableIdentifier
+ * @param thriftTableInfo
+ * @param carbonStorePath
+ * @param sparkSession
+ */
+ def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ carbonStorePath: String)
+ (sparkSession: SparkSession): String = {
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(thriftTableInfo,
+ carbonTableIdentifier.getDatabaseName,
+ carbonTableIdentifier.getTableName,
+ carbonStorePath)
+ val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
+ evolutionEntries.remove(evolutionEntries.size() - 1)
+ createSchemaThriftFile(wrapperTableInfo,
+ thriftTableInfo,
+ carbonTableIdentifier.getDatabaseName,
+ carbonTableIdentifier.getTableName)(sparkSession)
+ }
+
+
+
+ /**
*
* Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
* Load CarbonTable from wrapper tableInfo
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 2e7eebf..5057d75 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util
+import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.SparkConf
@@ -25,14 +26,28 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
import org.apache.spark.sql.hive.HiveExternalCatalog._
-import org.apache.carbondata.common.logging.LogService
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
object AlterTableUtil {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * Validates that the table exists and acquires meta lock on it.
+ *
+ * @param dbName
+ * @param tableName
+ * @param LOGGER
+ * @param sparkSession
+ * @return
+ */
def validateTableAndAcquireLock(dbName: String,
tableName: String,
locksToBeAcquired: List[String],
@@ -125,6 +140,13 @@ object AlterTableUtil {
}
}
+ /**
+ * @param carbonTable
+ * @param schemaEvolutionEntry
+ * @param thriftTable
+ * @param sparkSession
+ * @param catalog
+ */
def updateSchemaInfo(carbonTable: CarbonTable,
schemaEvolutionEntry: SchemaEvolutionEntry,
thriftTable: TableInfo)(sparkSession: SparkSession, catalog: HiveExternalCatalog): Unit = {
@@ -167,4 +189,141 @@ object AlterTableUtil {
}
schemaParts.mkString(",")
}
+
+ /**
+ * This method reverts the changes to the schema if the rename table command fails.
+ *
+ * @param oldTableIdentifier
+ * @param newTableName
+ * @param lastUpdatedTime
+ * @param sparkSession
+ */
+ def revertRenameTableChanges(oldTableIdentifier: TableIdentifier,
+ newTableName: String,
+ lastUpdatedTime: Long)
+ (sparkSession: SparkSession): Unit = {
+ val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val carbonTable: CarbonTable = CarbonMetadata.getInstance
+ .getCarbonTable(database + "_" + newTableName)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val fileType = FileFactory.getFileType(tableMetadataFile)
+ val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
+ val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
+ if (updatedTime > lastUpdatedTime) {
+ LOGGER.error(s"Reverting changes for $database.${oldTableIdentifier.table}")
+ FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
+ .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+ oldTableIdentifier.table)
+ val tableIdentifier = new CarbonTableIdentifier(database,
+ oldTableIdentifier.table,
+ carbonTable.getCarbonTableIdentifier.getTableId)
+ CarbonEnv.get.carbonMetastore.revertTableSchema(tableIdentifier,
+ tableInfo,
+ carbonTable.getStorePath)(sparkSession)
+ CarbonEnv.get.carbonMetastore.removeTableFromMetadata(database, newTableName)
+ }
+ }
+
+ /**
+ * This method reverts the changes to the schema if add column command fails.
+ *
+ * @param dbName
+ * @param tableName
+ * @param lastUpdatedTime
+ * @param sparkSession
+ */
+ def revertAddColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
+ (sparkSession: SparkSession): Unit = {
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+
+
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
+ val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
+ if (updatedTime > lastUpdatedTime) {
+ LOGGER.error(s"Reverting changes for $dbName.$tableName")
+ val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added
+ thriftTable.fact_table.table_columns.removeAll(addedSchemas)
+ CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
+ thriftTable, carbonTable.getStorePath)(sparkSession)
+ }
+ }
+
+ /**
+ * This method reverts the schema changes if drop table command fails.
+ *
+ * @param dbName
+ * @param tableName
+ * @param lastUpdatedTime
+ * @param sparkSession
+ */
+ def revertDropColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
+ (sparkSession: SparkSession): Unit = {
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
+ val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
+ if (updatedTime > lastUpdatedTime) {
+ LOGGER.error(s"Reverting changes for $dbName.$tableName")
+ val removedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).removed
+ thriftTable.fact_table.table_columns.asScala.foreach { columnSchema =>
+ removedSchemas.asScala.foreach { removedSchemas =>
+ if (columnSchema.invisible && removedSchemas.column_id == columnSchema.column_id) {
+ columnSchema.setInvisible(false)
+ }
+ }
+ }
+ CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
+ thriftTable, carbonTable.getStorePath)(sparkSession)
+ }
+ }
+
+ /**
+ * This method reverts the changes to schema if the data type change command fails.
+ *
+ * @param dbName
+ * @param tableName
+ * @param lastUpdatedTime
+ * @param sparkSession
+ */
+ def revertDataTypeChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
+ (sparkSession: SparkSession): Unit = {
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
+ val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
+ if (updatedTime > lastUpdatedTime) {
+ LOGGER.error(s"Reverting changes for $dbName.$tableName")
+ val removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed
+ thriftTable.fact_table.table_columns.asScala.foreach { columnSchema =>
+ removedColumns.asScala.foreach { removedColumn =>
+ if (columnSchema.column_id.equalsIgnoreCase(removedColumn.column_id) &&
+ !columnSchema.isInvisible) {
+ columnSchema.setData_type(removedColumn.data_type)
+ columnSchema.setPrecision(removedColumn.precision)
+ columnSchema.setScale(removedColumn.scale)
+ }
+ }
+ }
+ CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
+ thriftTable, carbonTable.getStorePath)(sparkSession)
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
new file mode 100644
index 0000000..958b426
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -0,0 +1,69 @@
+package org.apache.spark.carbondata.restructure
+
+import java.io.File
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.scalatest.BeforeAndAfterAll
+
+
+class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll() {
+ sql("drop table if exists reverttest")
+ sql(
+ "CREATE TABLE reverttest(intField int,stringField string,timestampField timestamp," +
+ "decimalField decimal(6,2)) STORED BY 'carbondata'")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE reverttest " +
+ s"options('FILEHEADER'='intField,stringField,timestampField,decimalField')")
+ }
+
+ test("test to revert new added columns on failure") {
+ intercept[RuntimeException] {
+ hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
+ sql(
+ "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
+ "('DICTIONARY_EXCLUDE'='newField','DEFAULT.VALUE.charfield'='def')")
+ hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+ intercept[AnalysisException] {
+ sql("select newField from reverttest")
+ }
+ }
+ }
+
+ test("test to revert table name on failure") {
+ intercept[RuntimeException] {
+ new File(TestQueryExecutor.warehouse + "/reverttest_fail").mkdir()
+ sql("alter table reverttest rename to reverttest_fail")
+ new File(TestQueryExecutor.warehouse + "/reverttest_fail").delete()
+ }
+ val result = sql("select * from reverttest").count()
+ assert(result.equals(1L))
+ }
+
+ test("test to revert drop columns on failure") {
+ intercept[Exception] {
+ hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
+ sql("Alter table reverttest drop columns(decimalField)")
+ hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+ }
+ assert(sql("select decimalField from reverttest").count().equals(1L))
+ }
+
+ test("test to revert changed datatype on failure") {
+ intercept[Exception] {
+ hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
+ sql("Alter table reverttest change intField intfield bigint")
+ hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+ }
+ assert(
+ sql("select intfield from reverttest").schema.fields.apply(0).dataType.simpleString == "int")
+ }
+
+ override def afterAll() {
+ hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+ sql("drop table if exists reverttest")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 93d1282..c37ea1e 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.test.TestQueryExecutor
import org.apache.spark.sql.{DataFrame, Row}
@@ -39,6 +40,8 @@ class QueryTest extends PlanTest {
val sqlContext = TestQueryExecutor.INSTANCE.sqlContext
+ val hiveClient = sqlContext.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+
val resourcesPath = TestQueryExecutor.resourcesPath
def sql(sqlText: String): DataFrame = TestQueryExecutor.INSTANCE.sql(sqlText)