You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/04/07 09:55:43 UTC

[40/49] incubator-carbondata git commit: added support to revert changes if query fails

added support to revert changes if query fails


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2a4f09b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2a4f09b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2a4f09b7

Branch: refs/heads/12-dev
Commit: 2a4f09b7ab125581c7caa2bf57513abc07ac3c7f
Parents: bbade2a
Author: kunal642 <ku...@knoldus.in>
Authored: Wed Mar 15 14:38:31 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 6 17:58:39 2017 +0530

----------------------------------------------------------------------
 .../ThriftWrapperSchemaConverterImpl.java       |   4 +-
 .../carbondata/core/util/DataTypeUtil.java      |  16 +-
 .../execution/command/carbonTableSchema.scala   |   2 +-
 .../execution/command/AlterTableCommands.scala  |  48 +++---
 .../apache/spark/sql/hive/CarbonMetastore.scala |  30 +++-
 .../org/apache/spark/util/AlterTableUtil.scala  | 163 ++++++++++++++++++-
 .../restructure/AlterTableRevertTestCase.scala  |  69 ++++++++
 .../spark/sql/common/util/QueryTest.scala       |   3 +
 8 files changed, 303 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 09ed368..974cc81 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -404,8 +404,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       org.apache.carbondata.format.TableInfo externalTableInfo, String dbName, String tableName,
       String storePath) {
     TableInfo wrapperTableInfo = new TableInfo();
+    List<org.apache.carbondata.format.SchemaEvolutionEntry> schemaEvolutionList =
+        externalTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history();
     wrapperTableInfo.setLastUpdatedTime(
-        externalTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history().get(0)
+        schemaEvolutionList.get(schemaEvolutionList.size() - 1)
             .getTime_stamp());
     wrapperTableInfo.setDatabaseName(dbName);
     wrapperTableInfo.setTableUniqueName(dbName + "_" + tableName);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index e437405..76df425 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -573,10 +573,10 @@ public final class DataTypeUtil {
    * Below method will be used to convert the data into byte[]
    *
    * @param data
-   * @param actualDataType actual data type
+   * @param ColumnSchema
    * @return actual data in byte[]
    */
-  public static byte[] convertDataToBytesBasedOnDataType(String data, DataType actualDataType) {
+  public static byte[] convertDataToBytesBasedOnDataType(String data, ColumnSchema columnSchema) {
     if (null == data) {
       return null;
     } else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(data)) {
@@ -585,7 +585,7 @@ public final class DataTypeUtil {
     }
     try {
       long parsedIntVal = 0;
-      switch (actualDataType) {
+      switch (columnSchema.getDataType()) {
         case INT:
           parsedIntVal = (long) Integer.parseInt(data);
           return String.valueOf(parsedIntVal)
@@ -602,13 +602,17 @@ public final class DataTypeUtil {
               .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
         case DATE:
         case TIMESTAMP:
-          DirectDictionaryGenerator directDictionaryGenerator =
-              DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(actualDataType);
+          DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+              .getDirectDictionaryGenerator(columnSchema.getDataType());
           int value = directDictionaryGenerator.generateDirectSurrogateKey(data);
           return String.valueOf(value)
               .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
         case DECIMAL:
-          java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data);
+          String parsedValue = parseStringToBigDecimal(data, columnSchema);
+          if (null == parsedValue) {
+            return null;
+          }
+          java.math.BigDecimal javaDecVal = new java.math.BigDecimal(parsedValue);
           return bigDecimalToByte(javaDecVal);
         default:
           return UTF8String.fromString(data).getBytes();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 367bf46..117b365 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -239,7 +239,7 @@ class AlterTableProcessor(
         if (elem._1.toLowerCase.startsWith(defaultValueString)) {
           if (col.getColumnName.equalsIgnoreCase(elem._1.substring(defaultValueString.length))) {
             rawData = elem._2
-            val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col.getDataType)
+            val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col)
             if (null != data) {
               col.setDefaultValue(data)
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 38fdb11..0be0bdf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -56,6 +57,7 @@ private[sql] case class AlterTableAddColumns(
     // get the latest carbon table and check for column existence
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
+    val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
     try {
       // read the latest schema file
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
@@ -90,9 +92,8 @@ private[sql] case class AlterTableAddColumns(
       LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
     } catch {
-      case e: Exception =>
-        LOGGER.error(e, s"Alter table add columns failed : ${e.getMessage}")
-        // clean up the dictionary files in case of any failure
+      case e: Exception => LOGGER
+        .error("Alter table add columns failed :" + e.getMessage)
         if (!newCols.isEmpty) {
           LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
           new AlterTableDropColumnRDD(sparkSession.sparkContext,
@@ -100,6 +101,7 @@ private[sql] case class AlterTableAddColumns(
             carbonTable.getCarbonTableIdentifier,
             carbonTable.getStorePath).collect()
         }
+        AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
         sys.error("Alter table add column operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
@@ -151,6 +153,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)(
         sparkSession)
     val carbonTable = relation.tableMeta.carbonTable
+    val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
     try {
       // get the latest carbon table and check for column existence
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
@@ -160,6 +163,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
         .readSchemaFile(tableMetadataFile)
       val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
       schemaEvolutionEntry.setTableName(newTableName)
+      schemaEvolutionEntry.setTime_stamp(System.currentTimeMillis())
       renameBadRecords(oldTableName, newTableName, oldDatabaseName)
       val fileType = FileFactory.getFileType(tableMetadataFile)
       if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
@@ -167,8 +171,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
           .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
                        newTableName)
         if (!rename) {
-          sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
           renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+          sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
         }
       }
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
@@ -190,8 +194,11 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {
-      case e: Exception =>
-        LOGGER.error(e, s"Rename table failed: ${e.getMessage}")
+      case e: Exception => LOGGER
+        .error("Rename table failed: " + e.getMessage)
+        AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, newTableName, lastUpdatedTime)(
+            sparkSession)
+        renameBadRecords(newTableName, oldTableName, oldDatabaseName)
         sys.error("Alter table rename table operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
@@ -240,9 +247,10 @@ private[sql] case class AlterTableDropColumns(
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     val locks = AlterTableUtil
       .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
+    // get the latest carbon table and check for column existence
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
     try {
-      // get the latest carbon table and check for column existence
-      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
       // check each column existence in the table
       val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
       var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
@@ -269,12 +277,8 @@ private[sql] case class AlterTableDropColumns(
       }
       // take the total key column count. key column to be deleted should not
       // be >= key columns in schema
-      var totalKeyColumnInSchema = 0
-      tableColumns.foreach { tableColumn =>
-        // column should not be already deleted and should exist in the table
-        if (!tableColumn.isInvisible && tableColumn.isDimesion) {
-          totalKeyColumnInSchema += 1
-        }
+      val totalKeyColumnInSchema = tableColumns.count {
+        tableColumn => !tableColumn.isInvisible && tableColumn.isDimesion
       }
       if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
         sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
@@ -315,8 +319,9 @@ private[sql] case class AlterTableDropColumns(
       LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
     } catch {
-      case e: Exception =>
-        LOGGER.error(e, s"Alter table drop columns failed : ${e.getMessage}")
+      case e: Exception => LOGGER
+        .error("Alter table drop columns failed : " + e.getMessage)
+        AlterTableUtil.revertDropColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
         sys.error("Alter table drop column operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
@@ -339,11 +344,11 @@ private[sql] case class AlterTableDataTypeChange(
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     val locks = AlterTableUtil
       .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
+    // get the latest carbon table and check for column existence
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
     try {
-      // get the latest carbon table and check for column existence
-      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
       val columnName = alterTableDataTypeChangeModel.columnName
-      var carbonColumnToBeModified: CarbonColumn = null
       val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
 
       if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
@@ -393,8 +398,9 @@ private[sql] case class AlterTableDataTypeChange(
       LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
     } catch {
-      case e: Exception =>
-        LOGGER.error(e, s"Alter table change datatype failed : ${e.getMessage}")
+      case e: Exception => LOGGER
+        .error("Alter table change datatype failed : " + e.getMessage)
+        AlterTableUtil.revertDataTypeChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
         sys.error("Alter table data type change operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 6460490..6f74960 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -301,7 +301,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
   }
 
   /**
-   * This method will overwrite the existing schema and update it with the gievn details
+   * This method will overwrite the existing schema and update it with the given details
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
@@ -328,6 +328,34 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
   }
 
   /**
+   * This method will is used to remove the evolution entry in case of failure.
+   *
+   * @param carbonTableIdentifier
+   * @param thriftTableInfo
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      carbonStorePath: String)
+    (sparkSession: SparkSession): String = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(thriftTableInfo,
+        carbonTableIdentifier.getDatabaseName,
+        carbonTableIdentifier.getTableName,
+        carbonStorePath)
+    val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
+    evolutionEntries.remove(evolutionEntries.size() - 1)
+    createSchemaThriftFile(wrapperTableInfo,
+      thriftTableInfo,
+      carbonTableIdentifier.getDatabaseName,
+      carbonTableIdentifier.getTableName)(sparkSession)
+  }
+
+
+
+  /**
    *
    * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
    * Load CarbonTable from wrapper tableInfo

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 2e7eebf..5057d75 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.util
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.SparkConf
@@ -25,14 +26,28 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 
-import org.apache.carbondata.common.logging.LogService
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 
 object AlterTableUtil {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * Validates that the table exists and acquires meta lock on it.
+   *
+   * @param dbName
+   * @param tableName
+   * @param LOGGER
+   * @param sparkSession
+   * @return
+   */
   def validateTableAndAcquireLock(dbName: String,
       tableName: String,
       locksToBeAcquired: List[String],
@@ -125,6 +140,13 @@ object AlterTableUtil {
     }
   }
 
+  /**
+   * @param carbonTable
+   * @param schemaEvolutionEntry
+   * @param thriftTable
+   * @param sparkSession
+   * @param catalog
+   */
   def updateSchemaInfo(carbonTable: CarbonTable,
       schemaEvolutionEntry: SchemaEvolutionEntry,
       thriftTable: TableInfo)(sparkSession: SparkSession, catalog: HiveExternalCatalog): Unit = {
@@ -167,4 +189,141 @@ object AlterTableUtil {
     }
     schemaParts.mkString(",")
   }
+
+  /**
+   * This method reverts the changes to the schema if the rename table command fails.
+   *
+   * @param oldTableIdentifier
+   * @param newTableName
+   * @param lastUpdatedTime
+   * @param sparkSession
+   */
+  def revertRenameTableChanges(oldTableIdentifier: TableIdentifier,
+      newTableName: String,
+      lastUpdatedTime: Long)
+    (sparkSession: SparkSession): Unit = {
+    val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val carbonTable: CarbonTable = CarbonMetadata.getInstance
+      .getCarbonTable(database + "_" + newTableName)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+      carbonTable.getCarbonTableIdentifier)
+    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val fileType = FileFactory.getFileType(tableMetadataFile)
+    val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
+      .readSchemaFile(tableMetadataFile)
+    val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
+    val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
+    if (updatedTime > lastUpdatedTime) {
+      LOGGER.error(s"Reverting changes for $database.${oldTableIdentifier.table}")
+      FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
+        .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+                     oldTableIdentifier.table)
+      val tableIdentifier = new CarbonTableIdentifier(database,
+        oldTableIdentifier.table,
+        carbonTable.getCarbonTableIdentifier.getTableId)
+      CarbonEnv.get.carbonMetastore.revertTableSchema(tableIdentifier,
+        tableInfo,
+        carbonTable.getStorePath)(sparkSession)
+      CarbonEnv.get.carbonMetastore.removeTableFromMetadata(database, newTableName)
+    }
+  }
+
+  /**
+   * This method reverts the changes to the schema if add column command fails.
+   *
+   * @param dbName
+   * @param tableName
+   * @param lastUpdatedTime
+   * @param sparkSession
+   */
+  def revertAddColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
+    (sparkSession: SparkSession): Unit = {
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+
+
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+      carbonTable.getCarbonTableIdentifier)
+    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+      .readSchemaFile(tableMetadataFile)
+    val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
+    val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
+    if (updatedTime > lastUpdatedTime) {
+      LOGGER.error(s"Reverting changes for $dbName.$tableName")
+      val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added
+      thriftTable.fact_table.table_columns.removeAll(addedSchemas)
+      CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
+        thriftTable, carbonTable.getStorePath)(sparkSession)
+    }
+  }
+
+  /**
+   * This method reverts the schema changes if drop table command fails.
+   *
+   * @param dbName
+   * @param tableName
+   * @param lastUpdatedTime
+   * @param sparkSession
+   */
+  def revertDropColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
+    (sparkSession: SparkSession): Unit = {
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+      carbonTable.getCarbonTableIdentifier)
+    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+      .readSchemaFile(tableMetadataFile)
+    val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
+    val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
+    if (updatedTime > lastUpdatedTime) {
+      LOGGER.error(s"Reverting changes for $dbName.$tableName")
+      val removedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).removed
+      thriftTable.fact_table.table_columns.asScala.foreach { columnSchema =>
+        removedSchemas.asScala.foreach { removedSchemas =>
+          if (columnSchema.invisible && removedSchemas.column_id == columnSchema.column_id) {
+            columnSchema.setInvisible(false)
+          }
+        }
+      }
+      CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
+        thriftTable, carbonTable.getStorePath)(sparkSession)
+    }
+  }
+
+  /**
+   * This method reverts the changes to schema if the data type change command fails.
+   *
+   * @param dbName
+   * @param tableName
+   * @param lastUpdatedTime
+   * @param sparkSession
+   */
+  def revertDataTypeChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
+    (sparkSession: SparkSession): Unit = {
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+      carbonTable.getCarbonTableIdentifier)
+    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+      .readSchemaFile(tableMetadataFile)
+    val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
+    val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
+    if (updatedTime > lastUpdatedTime) {
+      LOGGER.error(s"Reverting changes for $dbName.$tableName")
+      val removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed
+      thriftTable.fact_table.table_columns.asScala.foreach { columnSchema =>
+        removedColumns.asScala.foreach { removedColumn =>
+          if (columnSchema.column_id.equalsIgnoreCase(removedColumn.column_id) &&
+              !columnSchema.isInvisible) {
+            columnSchema.setData_type(removedColumn.data_type)
+            columnSchema.setPrecision(removedColumn.precision)
+            columnSchema.setScale(removedColumn.scale)
+          }
+        }
+      }
+      CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
+        thriftTable, carbonTable.getStorePath)(sparkSession)
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
new file mode 100644
index 0000000..958b426
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -0,0 +1,69 @@
+package org.apache.spark.carbondata.restructure
+
+import java.io.File
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.scalatest.BeforeAndAfterAll
+
+
+class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll() {
+    sql("drop table if exists reverttest")
+    sql(
+      "CREATE TABLE reverttest(intField int,stringField string,timestampField timestamp," +
+      "decimalField decimal(6,2)) STORED BY 'carbondata'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE reverttest " +
+        s"options('FILEHEADER'='intField,stringField,timestampField,decimalField')")
+  }
+
+  test("test to revert new added columns on failure") {
+    intercept[RuntimeException] {
+      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
+      sql(
+        "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
+        "('DICTIONARY_EXCLUDE'='newField','DEFAULT.VALUE.charfield'='def')")
+      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+      intercept[AnalysisException] {
+        sql("select newField from reverttest")
+      }
+    }
+  }
+
+  test("test to revert table name on failure") {
+    intercept[RuntimeException] {
+      new File(TestQueryExecutor.warehouse + "/reverttest_fail").mkdir()
+      sql("alter table reverttest rename to reverttest_fail")
+      new File(TestQueryExecutor.warehouse + "/reverttest_fail").delete()
+    }
+    val result = sql("select * from reverttest").count()
+    assert(result.equals(1L))
+  }
+
+  test("test to revert drop columns on failure") {
+    intercept[Exception] {
+      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
+      sql("Alter table reverttest drop columns(decimalField)")
+      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+    }
+    assert(sql("select decimalField from reverttest").count().equals(1L))
+  }
+
+  test("test to revert changed datatype on failure") {
+    intercept[Exception] {
+      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
+      sql("Alter table reverttest change intField intfield bigint")
+      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+    }
+    assert(
+      sql("select intfield from reverttest").schema.fields.apply(0).dataType.simpleString == "int")
+  }
+
+  override def afterAll() {
+    hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+    sql("drop table if exists reverttest")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2a4f09b7/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 93d1282..c37ea1e 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConversions._
 
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.{DataFrame, Row}
 
@@ -39,6 +40,8 @@ class QueryTest extends PlanTest {
 
   val sqlContext = TestQueryExecutor.INSTANCE.sqlContext
 
+  val hiveClient = sqlContext.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+
   val resourcesPath = TestQueryExecutor.resourcesPath
 
   def sql(sqlText: String): DataFrame  = TestQueryExecutor.INSTANCE.sql(sqlText)