You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/08/24 11:24:26 UTC

[carbondata] branch master updated: [CARBONDATA-3933]Fix DDL/DML failures after table is created with column names having special characters like #, \, %

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 57bcc92  [CARBONDATA-3933]Fix DDL/DML failures after table is created with column names having special characters like #,\,%
57bcc92 is described below

commit 57bcc9214e54219bcd76a3d19ba8446e4ed6dc8a
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Fri Jul 24 11:50:20 2020 +0530

    [CARBONDATA-3933]Fix DDL/DML failures after table is created
    with column names having special characters like #,\,%
    
    Why is this PR needed?
    when operations like insert,describe, select is fired after
    table is created with column names having special characters,
    operations fails. This is because after table creation spark already
    stred the column names with special characters correctly,
    bt for further operations we try to update the metastore/refresh
    with new schema parts which is unnecessary which causes this issue.
    
    What changes were proposed in this PR?
    We use unnecessary API to update the serde properties of table by
    calling spark-sql, this was required when we used to support the
    spark-2.1 and 2.2 version when spark was not supporting many alter
    table operations. Now since we use other APIs to alter the table,
    these API calls are not necessary. So once we remove we in turn
    avoid updating the serde properties with the modified schema parts which solves this issue.
    
    This closes #3862
---
 .../schema/CarbonAlterTableAddColumnCommand.scala  |  4 +--
 ...nAlterTableColRenameDataTypeChangeCommand.scala |  4 +--
 .../schema/CarbonAlterTableDropColumnCommand.scala |  6 ++--
 .../schema/CarbonAlterTableRenameCommand.scala     |  3 +-
 .../spark/sql/hive/CarbonSessionCatalog.scala      | 30 ----------------
 .../spark/sql/hive/CarbonSessionCatalogUtil.scala  | 40 +++++++---------------
 .../apache/spark/sql/hive/CarbonSessionUtil.scala  |  2 --
 .../apache/spark/sql/index/CarbonIndexUtil.scala   |  3 +-
 .../org/apache/spark/util/AlterTableUtil.scala     | 37 +++-----------------
 .../spark/sql/hive/CarbonSessionStateBuilder.scala | 19 ----------
 .../spark/sql/hive/CarbonSessionStateBuilder.scala | 19 ----------
 .../testsuite/dataload/TestLoadDataGeneral.scala   | 14 ++++++++
 .../restructure/AlterTableRevertTestCase.scala     | 23 ++++++++-----
 13 files changed, 56 insertions(+), 148 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 8b79b70..2e86d14 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -93,7 +93,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
         .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
       // sort the new columns based on schema order
       val sortedColsBasedActualSchemaOrder = newCols.sortBy(a => a.getSchemaOrdinal)
-      val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
+      val tableIdentifier = AlterTableUtil.updateSchemaInfo(
           carbonTable,
           schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
           thriftTable)(sparkSession)
@@ -110,7 +110,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       } else {
         Some(carbonColumns ++ sortedColsBasedActualSchemaOrder)
       }
-      CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, schemaParts, cols, sparkSession)
+      CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, cols, sparkSession)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
         AlterTableAddColumnPostEvent(sparkSession, carbonTable, alterTableAddColumnsModel)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 1c27dd7..8ce774a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -313,10 +313,10 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
     } else {
       Some(carbonColumns)
     }
-    val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
+    val tableIdentifier = AlterTableUtil.updateSchemaInfo(
       carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession)
     CarbonSessionCatalogUtil.alterColumnChangeDataTypeOrRename(
-      tableIdentifier, schemaParts, columns, sparkSession)
+      tableIdentifier, columns, sparkSession)
     sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 2289aad..55f4c03 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -160,12 +160,12 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       val delCols = deletedColumnSchema.map { deleteCols =>
         schemaConverter.fromExternalToWrapperColumnSchema(deleteCols)
       }
-      val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
+      val tableIdentifier = AlterTableUtil.updateSchemaInfo(
         carbonTable,
         schemaEvolutionEntry,
         tableInfo)(sparkSession)
       // get the columns in schema order and filter the dropped column in the column set
-      val cols = carbonTable.getCreateOrderColumn().asScala
+      val cols = carbonTable.getCreateOrderColumn.asScala
         .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
         .filterNot(column => delCols.contains(column))
       // When we call
@@ -181,7 +181,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
         Some(cols)
       }
       CarbonSessionCatalogUtil.alterDropColumns(
-        tableIdentifier, schemaParts, columns, sparkSession)
+        tableIdentifier, columns, sparkSession)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       // TODO: 1. add check for deletion of index tables
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 919ce91..05052aa 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil, MockClassForAlterRevertTests}
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -140,6 +140,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         tableInfo,
         schemaEvolutionEntry,
         carbonTable.getTablePath)(sparkSession)
+      new MockClassForAlterRevertTests().mockForAlterRevertTest()
 
       val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
         carbonTable,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
index 316d329..99ceebf 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -69,34 +69,4 @@ trait CarbonSessionCatalog {
       storage: CatalogStorageFormat,
       newTableName: String,
       dbName: String): CatalogStorageFormat
-
-  /**
-   * Below method will be used to add new column
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit
-
-  /**
-   * Below method will be used to drop column
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit
-
-  /**
-   * Below method will be used to alter data type of column in schema
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
index 7288239..beaebed 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
@@ -61,21 +61,6 @@ object CarbonSessionCatalogUtil {
       s"'dbName'='${ newTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
   }
 
-  /**
-   * Below method will be used to update serde properties
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterTable(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]],
-      sparkSession: SparkSession): Unit = {
-    getClient(sparkSession)
-      .runSqlHive(s"ALTER TABLE `${tableIdentifier.database.get}`.`${tableIdentifier.table}` " +
-                  s"SET TBLPROPERTIES($schemaParts)")
-  }
-
   def alterTableProperties(
       sparkSession: SparkSession,
       tableIdentifier: TableIdentifier,
@@ -85,11 +70,11 @@ object CarbonSessionCatalogUtil {
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableIdentifier)
     var newProperties = table.storage.properties
-    if (!propKeys.isEmpty) {
+    if (propKeys.nonEmpty) {
       val updatedPropKeys = propKeys.map(_.toLowerCase)
       newProperties = newProperties.filter { case (k, _) => !updatedPropKeys.contains(k) }
     }
-    if (!properties.isEmpty) {
+    if (properties.nonEmpty) {
       newProperties = newProperties ++ CarbonSparkSqlParserUtil.normalizeProperties(properties)
     }
     val newTable = table.copy(
@@ -113,21 +98,18 @@ object CarbonSessionCatalogUtil {
   }
 
   def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
       cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
+    updateCatalogTableForAlter(tableIdentifier, cols, sparkSession)
   }
 
   def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
       cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
+    updateCatalogTableForAlter(tableIdentifier, cols, sparkSession)
   }
 
   def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
       cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
+    updateCatalogTableForAlter(tableIdentifier, cols, sparkSession)
   }
 
   /**
@@ -135,16 +117,14 @@ object CarbonSessionCatalogUtil {
    * schema for all the alter operations like add column, drop column, change datatype or rename
    * column
    * @param tableIdentifier
-   * @param schemaParts
    * @param cols
    */
   private def updateCatalogTableForAlter(tableIdentifier: TableIdentifier,
-      schemaParts: String,
       cols: Option[Seq[ColumnSchema]],
       sparkSession: SparkSession): Unit = {
-    alterTable(tableIdentifier, schemaParts, cols, sparkSession)
+    new MockClassForAlterRevertTests().mockForAlterRevertTest()
     CarbonSessionUtil.alterExternalCatalogForTableWithUpdatedSchema(
-      tableIdentifier, cols, schemaParts, sparkSession)
+      tableIdentifier, cols, sparkSession)
   }
 
   /**
@@ -173,3 +153,9 @@ object CarbonSessionCatalogUtil {
     storage.copy(locationUri = Some(path.toUri))
   }
 }
+
+// This class is a dummy class to test the alter table scenarios in failure cases.
+class MockClassForAlterRevertTests {
+  def mockForAlterRevertTest(): Unit = {
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index 1b6d8f0..a6d3626 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -125,12 +125,10 @@ object CarbonSessionUtil {
    * @param tableIdentifier tableIdentifier for table
    * @param cols            all the column of table, which are updated with datatype change of
    *                        new column name
-   * @param schemaParts     schemaParts
    * @param sparkSession    sparkSession
    */
   def alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier: TableIdentifier,
       cols: Option[Seq[ColumnSchema]],
-      schemaParts: String,
       sparkSession: SparkSession): Unit = {
     val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
     val colArray: scala.collection.mutable.ArrayBuffer[StructField] = ArrayBuffer()
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index ea2c653..9bb7eb6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -358,10 +358,9 @@ object CarbonIndexUtil {
           tblPropertiesMap.put(property._1, property._2)
         }
       }
-      val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
+      val tableIdentifier = AlterTableUtil.updateSchemaInfo(
         carbonTable = carbonTable,
         thriftTable = thriftTable)(sparkSession)
-      CarbonSessionCatalogUtil.alterTable(tableIdentifier, schemaParts, None, sparkSession)
       // remove from the cache so that the table will be loaded again with the new table properties
       CarbonInternalMetastore
         .removeTableFromMetadataCache(carbonTable.getDatabaseName, tableName)(sparkSession)
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index afab626..9f29382 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -196,7 +196,7 @@ object AlterTableUtil {
   def updateSchemaInfo(carbonTable: CarbonTable,
       schemaEvolutionEntry: SchemaEvolutionEntry = null,
       thriftTable: TableInfo)
-    (sparkSession: SparkSession): (TableIdentifier, String) = {
+    (sparkSession: SparkSession): TableIdentifier = {
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getTableName
     CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -206,35 +206,9 @@ object AlterTableUtil {
         schemaEvolutionEntry,
         carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     val tableIdentifier = TableIdentifier(tableName, Some(dbName))
-    sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
-    val schema = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-      .lookupRelation(tableIdentifier)(sparkSession).schema.json
-    val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
-    (tableIdentifier, schemaParts)
-  }
-
-  /**
-   * This method will split schema string into multiple parts of configured size and
-   * registers the parts as keys in tableProperties which will be read by spark to prepare
-   * Carbon Table fields
-   *
-   * @param sparkConf
-   * @param schemaJsonString
-   * @return
-   */
-  def prepareSchemaJsonForAlterTable(sparkConf: SparkConf,
-      schemaJsonString: String): String = {
-    val threshold = sparkConf
-      .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
-        CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
-    // Split the JSON string.
-    val parts = schemaJsonString.grouped(threshold).toSeq
-    var schemaParts: Seq[String] = Seq.empty
-    schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
-    parts.zipWithIndex.foreach { case (part, index) =>
-      schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
-    }
-    schemaParts.mkString(",")
+    CarbonEnv.getInstance(sparkSession).carbonMetaStore
+      .lookupRelation(tableIdentifier)(sparkSession)
+    tableIdentifier
   }
 
   /**
@@ -522,11 +496,10 @@ object AlterTableUtil {
         // check if duplicate columns are present in both local dictionary include and exclude
         CarbonScalaUtil.validateDuplicateColumnsForLocalDict(tblPropertiesMap)
       }
-      val (tableIdentifier, schemaParts) = updateSchemaInfo(
+      val tableIdentifier = updateSchemaInfo(
         carbonTable = carbonTable,
         schemaEvolutionEntry,
         thriftTable = thriftTable)(sparkSession)
-      CarbonSessionCatalogUtil.alterTable(tableIdentifier, schemaParts, None, sparkSession)
       CarbonSessionCatalogUtil.alterTableProperties(
         sparkSession, tableIdentifier, lowerCasePropertiesMap.toMap, propKeys)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
diff --git a/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala b/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
index 832596e..8a9f8f3 100644
--- a/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
+++ b/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
@@ -112,25 +112,6 @@ class CarbonHiveSessionCatalog(
     CarbonSessionCatalogUtil.getClient(sparkSession)
   }
 
-  override def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  override def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterDropColumns(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterColumnChangeDataTypeOrRename(
-      tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
   /**
    * This is alternate way of getting partition information. It first fetches all partitions from
    * hive and then apply filter instead of querying hive along with filters.
diff --git a/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala b/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
index 33438e4..b3666d1 100644
--- a/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
+++ b/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
@@ -112,25 +112,6 @@ class CarbonHiveSessionCatalog(
     CarbonSessionCatalogUtil.getClient(sparkSession)
   }
 
-  override def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  override def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterDropColumns(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterColumnChangeDataTypeOrRename(
-      tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
   /**
    * This is alternate way of getting partition information. It first fetches all partitions from
    * hive and then apply filter instead of querying hive along with filters.
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 1e81554..6ae9e6b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -347,6 +347,20 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
       CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)
   }
 
+  test("test table creation with special char and other commands") {
+    sql("drop table if exists special_char")
+    sql("create table special_char(`i#d` string, `nam(e` string,`ci)&#@!ty` string,`a\be` int, `ag!e` float, `na^me1` Decimal(8,4)) stored as carbondata")
+    sql("insert into special_char values('1','joey','hud', 2, 2.2, 2.3456)")
+    checkAnswer(sql("select * from special_char"), Seq(Row("1","joey","hud", 2, 2.2, 2.3456)))
+    val df = sql("describe formatted special_char").collect()
+    assert(df.exists(_.get(0).toString.contains("i#d")))
+    assert(df.exists(_.get(0).toString.contains("nam(e")))
+    assert(df.exists(_.get(0).toString.contains("ci)&#@!ty")))
+    assert(df.exists(_.get(0).toString.contains("a\be")))
+    assert(df.exists(_.get(0).toString.contains("ag!e")))
+    assert(df.exists(_.get(0).toString.contains("na^me1")))
+  }
+
   override def afterEach {
     sql("DROP TABLE if exists loadtest")
     sql("drop table if exists invalidMeasures")
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 4e61916..5d2f510 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -19,7 +19,9 @@ package org.apache.spark.carbondata.restructure
 
 import java.io.File
 
+import mockit.{Mock, MockUp}
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.hive.MockClassForAlterRevertTests
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -30,6 +32,13 @@ import org.apache.carbondata.spark.exception.ProcessMetaDataException
 class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll() {
+    new MockUp[MockClassForAlterRevertTests]() {
+      @Mock
+      @throws[ProcessMetaDataException]
+      def mockForAlterRevertTest(): Unit = {
+        throw new ProcessMetaDataException("default", "reverttest", "thrown in mock")
+      }
+    }
     sql("drop table if exists reverttest")
     sql(
       "CREATE TABLE reverttest(intField int,stringField string,timestampField timestamp," +
@@ -40,11 +49,9 @@ class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test to revert new added columns on failure") {
     intercept[ProcessMetaDataException] {
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql(
         "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
         "('DEFAULT.VALUE.newField'='def')")
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
       intercept[AnalysisException] {
         sql("select newField from reverttest")
       }
@@ -64,18 +71,14 @@ class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test to revert drop columns on failure") {
     intercept[ProcessMetaDataException] {
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql("Alter table reverttest drop columns(decimalField)")
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
     }
     assert(sql("select decimalField from reverttest").count().equals(1L))
   }
 
   test("test to revert changed datatype on failure") {
     intercept[ProcessMetaDataException] {
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql("Alter table reverttest change intField intfield bigint")
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
     }
     assert(
       sql("select intfield from reverttest").schema.fields.apply(0).dataType.simpleString == "int")
@@ -83,11 +86,9 @@ class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test to check if dictionary files are deleted for new column if query fails") {
     intercept[ProcessMetaDataException] {
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql(
         "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
         "('DEFAULT.VALUE.newField'='def')")
-      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
       intercept[AnalysisException] {
         sql("select newField from reverttest")
       }
@@ -98,7 +99,11 @@ class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   override def afterAll() {
-    hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+    new MockUp[MockClassForAlterRevertTests]() {
+      @Mock
+      def mockForAlterRevertTest(): Unit = {
+      }
+    }
     sql("drop table if exists reverttest")
     sql("drop table if exists reverttest_fail")
   }