You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/09/27 06:34:05 UTC

[2/6] carbondata git commit: [CARBONDATA-1151] Refactor all carbon command to separate file in spark2 integration

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
new file mode 100644
index 0000000..21b974a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableSplitPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand}
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
+import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/**
+ * Command for Alter Table Add & Split partition
+ * Add is a special case of Splitting the default partition (part0)
+ */
+case class AlterTableSplitCarbonPartitionCommand(
+    splitPartitionModel: AlterTableSplitPartitionModel)
+  extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
+
+  val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+
+  // TODO will add rollback function in case of process data failure
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+    processData(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+    val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val tableName = splitPartitionModel.tableName
+    val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+    val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+    val storePath = relation.tableMeta.storePath
+    if (relation == null) {
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
+    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
+      LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
+      sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+    }
+    val table = relation.tableMeta.carbonTable
+    val partitionInfo = table.getPartitionInfo(tableName)
+    val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
+    // keep a copy of partitionIdList before update partitionInfo.
+    // will be used in partition data scan
+    oldPartitionIds.addAll(partitionIds.asJava)
+
+    if (partitionInfo == null) {
+      sys.error(s"Table $tableName is not a partition table.")
+    }
+    if (partitionInfo.getPartitionType == PartitionType.HASH) {
+      sys.error(s"Hash partition table cannot be added or split!")
+    }
+
+    updatePartitionInfo(partitionInfo, partitionIds)
+
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    // read TableInfo
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
+      dbName, tableName, storePath)
+    val tableSchema = wrapperTableInfo.getFactTable
+    tableSchema.setPartitionInfo(partitionInfo)
+    wrapperTableInfo.setFactTable(tableSchema)
+    wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+    val thriftTable =
+      schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+    carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
+      dbName, tableName, storePath)
+    CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
+    // update the schema modified time
+    carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
+    sparkSession.catalog.refreshTable(tableName)
+    Seq.empty
+  }
+
+  private def updatePartitionInfo(partitionInfo: PartitionInfo,
+      partitionIds: List[Int]) = {
+    val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+
+    val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+
+    PartitionUtils.updatePartitionInfo(
+      partitionInfo,
+      partitionIds,
+      splitPartitionModel.partitionId.toInt,
+      splitPartitionModel.splitInfo,
+      timestampFormatter,
+      dateFormatter)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+    val tableName = splitPartitionModel.tableName
+    var locks = List.empty[ICarbonLock]
+    var success = false
+    try {
+      val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+        LockUsage.COMPACTION_LOCK,
+        LockUsage.DELETE_SEGMENT_LOCK,
+        LockUsage.DROP_TABLE_LOCK,
+        LockUsage.CLEAN_FILES_LOCK,
+        LockUsage.ALTER_PARTITION_LOCK)
+      locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
+        locksToBeAcquired)(sparkSession)
+      val carbonLoadModel = new CarbonLoadModel()
+      val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+        .asInstanceOf[CarbonRelation]
+      val storePath = relation.tableMeta.storePath
+      val table = relation.tableMeta.carbonTable
+      val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+      val dataLoadSchema = new CarbonDataLoadSchema(table)
+      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+      carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
+      carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
+      carbonLoadModel.setStorePath(storePath)
+      val loadStartTime = CarbonUpdateUtil.readCurrentTime
+      carbonLoadModel.setFactTimeStamp(loadStartTime)
+      CarbonDataRDDFactory.alterTableSplitPartition(
+        sparkSession.sqlContext,
+        splitPartitionModel.partitionId.toInt.toString,
+        carbonLoadModel,
+        oldPartitionIds.asScala.toList
+      )
+      success = true
+    } catch {
+      case e: Exception =>
+        success = false
+        sys.error(s"Add/Split Partition failed. Please check logs for more info. ${ e.getMessage }")
+    } finally {
+      AlterTableUtil.releaseLocks(locks)
+      CacheProvider.getInstance().dropAllCache()
+      val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+      LOGGER.info("Locks released after alter table add/split partition action.")
+      LOGGER.audit("Locks released after alter table add/split partition action.")
+      if (success) {
+        LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
+        LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
+      }
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
new file mode 100644
index 0000000..224304a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.command.{RunnableCommand, SchemaProcessCommand}
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Command for show table partitions Command
+ */
+private[sql] case class ShowCarbonPartitionsCommand(
+    tableIdentifier: TableIdentifier)
+  extends RunnableCommand with SchemaProcessCommand {
+
+  override val output: Seq[Attribute] = CommonUtil.partitionInfoOutput
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(tableIdentifier)(sparkSession).
+      asInstanceOf[CarbonRelation]
+    val carbonTable = relation.tableMeta.carbonTable
+    val tableName = carbonTable.getFactTableName
+    val partitionInfo = carbonTable.getPartitionInfo(
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+    if (partitionInfo == null) {
+      throw new AnalysisException(
+        s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
+    }
+    val partitionType = partitionInfo.getPartitionType
+    val columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
+    val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
+    LOGGER.info("partition column name:" + columnName)
+    CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
new file mode 100644
index 0000000..6e6a4b1
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
+
+private[sql] case class AlterTableAddColumnCommand(
+    alterTableAddColumnsModel: AlterTableAddColumnsModel)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val tableName = alterTableAddColumnsModel.tableName
+    val dbName = alterTableAddColumnsModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+    var locks = List.empty[ICarbonLock]
+    var timeStamp = 0L
+    var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
+    var carbonTable: CarbonTable = null
+    try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+      // Consider a concurrent scenario where 2 alter operations are executed in parallel. 1st
+      // operation is success and updates the schema file. 2nd operation will get the lock after
+      // completion of 1st operation but as look up relation is called before it will have the
+      // older carbon table and this can lead to inconsistent state in the system. Therefor look
+      // up relation should be called after acquiring the lock
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
+        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
+      // get the latest carbon table and check for column existence
+      // read the latest schema file
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+      val wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(thriftTableInfo,
+          dbName,
+          tableName,
+          carbonTable.getStorePath)
+      newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
+        dbName,
+        wrapperTableInfo,
+        carbonTablePath,
+        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getStorePath, sparkSession.sparkContext).process
+      // generate dictionary files for the newly added columns
+      new AlterTableAddColumnRDD(sparkSession.sparkContext,
+        newCols,
+        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getStorePath).collect()
+      timeStamp = System.currentTimeMillis
+      val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+      schemaEvolutionEntry.setTimeStamp(timeStamp)
+      schemaEvolutionEntry.setAdded(newCols.toList.asJava)
+      val thriftTable = schemaConverter
+        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+      AlterTableUtil
+        .updateSchemaInfo(carbonTable,
+          schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
+          thriftTable)(sparkSession,
+          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+      LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
+      LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, "Alter table add columns failed")
+        if (newCols.nonEmpty) {
+          LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
+          new AlterTableDropColumnRDD(sparkSession.sparkContext,
+            newCols,
+            carbonTable.getCarbonTableIdentifier,
+            carbonTable.getStorePath).collect()
+          AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
+        }
+        sys.error(s"Alter table add operation failed: ${e.getMessage}")
+    } finally {
+      // release lock after command execution completion
+      AlterTableUtil.releaseLocks(locks)
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
new file mode 100644
index 0000000..be87bbb
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
+
+private[sql] case class AlterTableDataTypeChangeCommand(
+    alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val tableName = alterTableDataTypeChangeModel.tableName
+    val dbName = alterTableDataTypeChangeModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+    var locks = List.empty[ICarbonLock]
+    // get the latest carbon table and check for column existence
+    var carbonTable: CarbonTable = null
+    var timeStamp = 0L
+    try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
+        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
+      val columnName = alterTableDataTypeChangeModel.columnName
+      val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
+      if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
+        LOGGER.audit(s"Alter table change data type request has failed. " +
+                     s"Column $columnName does not exist")
+        sys.error(s"Column does not exist: $columnName")
+      }
+      val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
+      if (carbonColumn.size == 1) {
+        CarbonScalaUtil
+          .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
+      } else {
+        LOGGER.audit(s"Alter table change data type request has failed. " +
+                     s"Column $columnName is invalid")
+        sys.error(s"Invalid Column: $columnName")
+      }
+      // read the latest schema file
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      // maintain the added column for schema evolution history
+      var addColumnSchema: ColumnSchema = null
+      var deletedColumnSchema: ColumnSchema = null
+      val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
+      columnSchemaList.foreach { columnSchema =>
+        if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
+          deletedColumnSchema = columnSchema.deepCopy
+          columnSchema.setData_type(DataTypeConverterUtil
+            .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
+          columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
+          columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
+          addColumnSchema = columnSchema
+        }
+      }
+      timeStamp = System.currentTimeMillis
+      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+      schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
+      schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
+      tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+        .setTime_stamp(System.currentTimeMillis)
+      AlterTableUtil
+        .updateSchemaInfo(carbonTable,
+          schemaEvolutionEntry,
+          tableInfo)(sparkSession,
+          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+      LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
+      LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
+    } catch {
+      case e: Exception => LOGGER
+        .error("Alter table change datatype failed : " + e.getMessage)
+        if (carbonTable != null) {
+          AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
+        }
+        sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
+    } finally {
+      // release lock after command execution completion
+      AlterTableUtil.releaseLocks(locks)
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
new file mode 100644
index 0000000..2f1e3d9
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.SchemaEvolutionEntry
+import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
+
+private[sql] case class AlterTableDropColumnCommand(
+    alterTableDropColumnModel: AlterTableDropColumnModel)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val tableName = alterTableDropColumnModel.tableName
+    val dbName = alterTableDropColumnModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
+    var locks = List.empty[ICarbonLock]
+    var timeStamp = 0L
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+    // get the latest carbon table and check for column existence
+    var carbonTable: CarbonTable = null
+    try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
+        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
+      val partitionInfo = carbonTable.getPartitionInfo(tableName)
+      if (partitionInfo != null) {
+        val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
+          .map(_.getColumnName)
+        // check each column existence in the table
+        val partitionColumns = alterTableDropColumnModel.columns.filter {
+          tableColumn => partitionColumnSchemaList.contains(tableColumn)
+        }
+        if (partitionColumns.nonEmpty) {
+          throw new UnsupportedOperationException("Partition columns cannot be dropped: " +
+                                                  s"$partitionColumns")
+        }
+      }
+      val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
+      var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
+      .ColumnSchema]()
+      var keyColumnCountToBeDeleted = 0
+      // TODO: if deleted column list includes bucketted column throw an error
+      alterTableDropColumnModel.columns.foreach { column =>
+        var columnExist = false
+        tableColumns.foreach { tableColumn =>
+          // column should not be already deleted and should exist in the table
+          if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
+            if (tableColumn.isDimension) {
+              keyColumnCountToBeDeleted += 1
+              if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
+                dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
+              }
+            }
+            columnExist = true
+          }
+        }
+        if (!columnExist) {
+          sys.error(s"Column $column does not exists in the table $dbName.$tableName")
+        }
+      }
+      // take the total key column count. key column to be deleted should not
+      // be >= key columns in schema
+      val totalKeyColumnInSchema = tableColumns.count {
+        tableColumn => !tableColumn.isInvisible && tableColumn.isDimension
+      }
+      if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
+        sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
+      }
+      // read the latest schema file
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val tableInfo: org.apache.carbondata.format.TableInfo =
+        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      // maintain the deleted columns for schema evolution history
+      var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
+      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
+      alterTableDropColumnModel.columns.foreach { column =>
+        columnSchemaList.foreach { columnSchema =>
+          if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
+            deletedColumnSchema += columnSchema.deepCopy
+            columnSchema.invisible = true
+          }
+        }
+      }
+      // add deleted columns to schema evolution history and update the schema
+      timeStamp = System.currentTimeMillis
+      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+      schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
+      AlterTableUtil
+        .updateSchemaInfo(carbonTable,
+          schemaEvolutionEntry,
+          tableInfo)(sparkSession,
+          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+      // TODO: 1. add check for deletion of index tables
+      // delete dictionary files for dictionary column and clear dictionary cache from memory
+      new AlterTableDropColumnRDD(sparkSession.sparkContext,
+        dictionaryColumns,
+        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getStorePath).collect()
+      LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
+      LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
+    } catch {
+      case e: Exception => LOGGER
+        .error("Alter table drop columns failed : " + e.getMessage)
+        if (carbonTable != null) {
+          AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
+        }
+        sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
+    } finally {
+      // release lock after command execution completion
+      AlterTableUtil.releaseLocks(locks)
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
new file mode 100644
index 0000000..af361d5
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.{AlterTableRenameModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.SchemaEvolutionEntry
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+private[sql] case class AlterTableRenameTableCommand(
+    alterTableRenameModel: AlterTableRenameModel)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
+    val newTableIdentifier = alterTableRenameModel.newTableIdentifier
+    val oldDatabaseName = oldTableIdentifier.database
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    val newDatabaseName = newTableIdentifier.database
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
+      throw new MalformedCarbonCommandException("Database name should be same for both tables")
+    }
+    val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
+    if (tableExists) {
+      throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
+                                                s"already exists")
+    }
+    val oldTableName = oldTableIdentifier.table.toLowerCase
+    val newTableName = newTableIdentifier.table.toLowerCase
+    LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+    LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val relation: CarbonRelation =
+      metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
+        .asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      LOGGER.audit(s"Rename table request has failed. " +
+                   s"Table $oldDatabaseName.$oldTableName does not exist")
+      sys.error(s"Table $oldDatabaseName.$oldTableName does not exist")
+    }
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+      LockUsage.COMPACTION_LOCK,
+      LockUsage.DELETE_SEGMENT_LOCK,
+      LockUsage.CLEAN_FILES_LOCK,
+      LockUsage.DROP_TABLE_LOCK)
+    var locks = List.empty[ICarbonLock]
+    var timeStamp = 0L
+    var carbonTable: CarbonTable = null
+    try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
+          sparkSession)
+      val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+        .asInstanceOf[CarbonRelation].tableMeta
+      carbonTable = tableMeta.carbonTable
+      // get the latest carbon table and check for column existence
+      val carbonTablePath = CarbonStorePath.
+        getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
+      val tableMetadataFile = carbonTablePath.getPath
+      val tableInfo: org.apache.carbondata.format.TableInfo =
+        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+      schemaEvolutionEntry.setTableName(newTableName)
+      timeStamp = System.currentTimeMillis()
+      schemaEvolutionEntry.setTime_stamp(timeStamp)
+      renameBadRecords(oldTableName, newTableName, oldDatabaseName)
+      val fileType = FileFactory.getFileType(tableMetadataFile)
+      if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+        val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
+          .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+                       newTableName)
+        if (!rename) {
+          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+          sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
+        }
+      }
+      val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
+        newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
+      val newTablePath = metastore.updateTableSchema(newTableIdentifier,
+        carbonTable.getCarbonTableIdentifier,
+        tableInfo,
+        schemaEvolutionEntry,
+        tableMeta.tablePath)(sparkSession)
+      metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
+      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+        .runSqlHive(
+          s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
+      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+        .runSqlHive(
+          s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
+          s"('tableName'='$newTableName', " +
+          s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+      sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
+        Some(oldDatabaseName)).quotedString)
+      LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
+      LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, "Rename table failed: " + e.getMessage)
+        if (carbonTable != null) {
+          AlterTableUtil
+            .revertRenameTableChanges(oldTableIdentifier,
+              newTableName,
+              carbonTable.getStorePath,
+              carbonTable.getCarbonTableIdentifier.getTableId,
+              timeStamp)(
+              sparkSession)
+          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+        }
+        sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
+    } finally {
+      // release lock after command execution completion
+      AlterTableUtil.releaseLocks(locks)
+      // case specific to rename table as after table rename old table path will not be found
+      if (carbonTable != null) {
+        AlterTableUtil
+          .releaseLocksManually(locks,
+            locksToBeAcquired,
+            oldDatabaseName,
+            newTableName,
+            carbonTable.getStorePath)
+      }
+    }
+    Seq.empty
+  }
+
+  private def renameBadRecords(
+      oldTableName: String,
+      newTableName: String,
+      dataBaseName: String): Unit = {
+    val oldPath = CarbonUtil
+      .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName)
+    val newPath = CarbonUtil
+      .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName)
+    val fileType = FileFactory.getFileType(oldPath)
+    if (FileFactory.isFileExist(oldPath, fileType)) {
+      val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType)
+        .renameForce(newPath)
+      if (!renameSuccess) {
+        sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
new file mode 100644
index 0000000..c6ca950
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.strategy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.BucketingInfo
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * Carbon specific optimization for late decode (convert dictionary key to value as late as
+ * possible), which can improve the aggregation performance and reduce memory usage
+ */
+private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
+  val PUSHED_FILTERS = "PushedFilters"
+
+  def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    plan match {
+      case PhysicalOperation(projects, filters, l: LogicalRelation)
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+        pruneFilterProject(
+          l,
+          projects,
+          filters,
+          (a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan(
+            a.map(_.name).toArray, f), needDecoder)) :: Nil
+      case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
+        if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) ||
+            !CarbonDictionaryDecoder.
+              isRequiredToDecode(CarbonDictionaryDecoder.
+                getDictionaryColumnMapping(child.output, relations, profile, aliasMap))) {
+          planLater(child) :: Nil
+        } else {
+          CarbonDictionaryDecoder(relations,
+            profile,
+            aliasMap,
+            planLater(child),
+            SparkSession.getActiveSession.get
+          ) :: Nil
+        }
+      case _ => Nil
+    }
+  }
+
+
+  def getDecoderRDD(
+      logicalRelation: LogicalRelation,
+      projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
+      rdd: RDD[InternalRow],
+      output: Seq[Attribute]): RDD[InternalRow] = {
+    val table = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+    val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
+      logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
+    val attrs = projectExprsNeedToDecode.map { attr =>
+      val newAttr = AttributeReference(attr.name,
+        attr.dataType,
+        attr.nullable,
+        attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName))
+      relation.addAttribute(newAttr)
+      newAttr
+    }
+
+    new CarbonDecoderRDD(
+      Seq(relation),
+      IncludeProfile(attrs),
+      CarbonAliasDecoderRelation(),
+      rdd,
+      output,
+      CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath,
+      table.carbonTable.getTableInfo.serialize())
+  }
+
+  private[this] def toCatalystRDD(
+      relation: LogicalRelation,
+      output: Seq[Attribute],
+      rdd: RDD[InternalRow],
+      needDecode: ArrayBuffer[AttributeReference]):
+  RDD[InternalRow] = {
+    if (needDecode.nonEmpty) {
+      rdd.asInstanceOf[CarbonScanRDD].setVectorReaderSupport(false)
+      getDecoderRDD(relation, needDecode, rdd, output)
+    } else {
+      rdd.asInstanceOf[CarbonScanRDD]
+        .setVectorReaderSupport(supportBatchedDataSource(relation.relation.sqlContext, output))
+      rdd
+    }
+  }
+
+  protected def pruneFilterProject(
+      relation: LogicalRelation,
+      projects: Seq[NamedExpression],
+      filterPredicates: Seq[Expression],
+      scanBuilder: (Seq[Attribute], Array[Filter],
+        ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
+    pruneFilterProjectRaw(
+      relation,
+      projects,
+      filterPredicates,
+      (requestedColumns, _, pushedFilters, a) => {
+        scanBuilder(requestedColumns, pushedFilters.toArray, a)
+      })
+  }
+
+  protected def pruneFilterProjectRaw(
+      relation: LogicalRelation,
+      rawProjects: Seq[NamedExpression],
+      filterPredicates: Seq[Expression],
+      scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
+        ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
+    val projects = rawProjects.map {p =>
+      p.transform {
+        case CustomDeterministicExpression(exp) => exp
+      }
+    }.asInstanceOf[Seq[NamedExpression]]
+
+    val projectSet = AttributeSet(projects.flatMap(_.references))
+    val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
+
+    val candidatePredicates = filterPredicates.map {
+      _ transform {
+        case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
+      }
+    }
+
+    val (unhandledPredicates, pushedFilters) =
+      selectFilters(relation.relation, candidatePredicates)
+
+    // A set of column attributes that are only referenced by pushed down filters.  We can eliminate
+    // them from requested columns.
+    val handledSet = {
+      val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
+      val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
+      AttributeSet(handledPredicates.flatMap(_.references)) --
+      (projectSet ++ unhandledSet).map(relation.attributeMap)
+    }
+
+    // Combines all Catalyst filter `Expression`s that are either not convertible to data source
+    // `Filter`s or cannot be handled by `relation`.
+    val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
+    val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+    val map = table.carbonRelation.metaData.dictionaryMap
+
+    val metadata: Map[String, String] = {
+      val pairs = ArrayBuffer.empty[(String, String)]
+
+      if (pushedFilters.nonEmpty) {
+        pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
+      }
+      pairs.toMap
+    }
+
+
+    val needDecoder = ArrayBuffer[AttributeReference]()
+    filterCondition match {
+      case Some(exp: Expression) =>
+        exp.references.collect {
+          case attr: AttributeReference =>
+            val dict = map.get(attr.name)
+            if (dict.isDefined && dict.get) {
+              needDecoder += attr
+            }
+        }
+      case None =>
+    }
+
+    projects.map {
+      case attr: AttributeReference =>
+      case Alias(attr: AttributeReference, _) =>
+      case others =>
+        others.references.map { f =>
+          val dictionary = map.get(f.name)
+          if (dictionary.isDefined && dictionary.get) {
+            needDecoder += f.asInstanceOf[AttributeReference]
+          }
+        }
+    }
+
+    if (projects.map(_.toAttribute) == projects &&
+        projectSet.size == projects.size &&
+        filterSet.subsetOf(projectSet)) {
+      // When it is possible to just use column pruning to get the right projection and
+      // when the columns of this projection are enough to evaluate all filter conditions,
+      // just do a scan followed by a filter, with no extra project.
+      val requestedColumns = projects
+        // Safe due to if above.
+        .asInstanceOf[Seq[Attribute]]
+        // Match original case of attributes.
+        .map(relation.attributeMap)
+        // Don't request columns that are only referenced by pushed filters.
+        .filterNot(handledSet.contains)
+      val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+
+      val updateProject = projects.map { expr =>
+        var attr = expr.toAttribute.asInstanceOf[AttributeReference]
+        if (!needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
+          val dict = map.get(attr.name)
+          if (dict.isDefined && dict.get) {
+            attr = AttributeReference(attr.name, IntegerType, attr.nullable, attr.metadata)(attr
+              .exprId, attr.qualifier)
+          }
+        }
+        attr
+      }
+      val scan = getDataSourceScan(relation,
+        updateProject,
+        scanBuilder,
+        candidatePredicates,
+        pushedFilters,
+        metadata,
+        needDecoder,
+        updateRequestedColumns.asInstanceOf[Seq[Attribute]])
+      filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
+    } else {
+
+      var newProjectList: Seq[Attribute] = Seq.empty
+      val updatedProjects = projects.map {
+          case a@Alias(s: ScalaUDF, name)
+            if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
+                name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
+            val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId)
+            newProjectList :+= reference
+            reference
+          case a@Alias(s: ScalaUDF, name)
+            if name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) =>
+            val reference =
+              AttributeReference(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
+                StringType, true)().withExprId(a.exprId)
+            newProjectList :+= reference
+            a.transform {
+              case s: ScalaUDF =>
+                ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
+            }
+          case other => other
+      }
+      // Don't request columns that are only referenced by pushed filters.
+      val requestedColumns =
+        (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
+      val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+      val scan = getDataSourceScan(relation,
+        updateRequestedColumns.asInstanceOf[Seq[Attribute]],
+        scanBuilder,
+        candidatePredicates,
+        pushedFilters,
+        metadata,
+        needDecoder,
+        updateRequestedColumns.asInstanceOf[Seq[Attribute]])
+      execution.ProjectExec(
+        updateRequestedColumnsFunc(updatedProjects, table,
+          needDecoder).asInstanceOf[Seq[NamedExpression]],
+        filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
+    }
+  }
+
+  def getDataSourceScan(relation: LogicalRelation,
+      output: Seq[Attribute],
+      scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
+        ArrayBuffer[AttributeReference]) => RDD[InternalRow],
+      candidatePredicates: Seq[Expression],
+      pushedFilters: Seq[Filter],
+      metadata: Map[String, String],
+      needDecoder: ArrayBuffer[AttributeReference],
+      updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = {
+    val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+    if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) &&
+        needDecoder.isEmpty) {
+      BatchedDataSourceScanExec(
+        output,
+        scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+        relation.relation,
+        getPartitioning(table.carbonTable, updateRequestedColumns),
+        metadata,
+        relation.catalogTable.map(_.identifier))
+    } else {
+      RowDataSourceScanExec(output,
+        scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+        relation.relation,
+        getPartitioning(table.carbonTable, updateRequestedColumns),
+        metadata,
+        relation.catalogTable.map(_.identifier))
+    }
+  }
+
+  def updateRequestedColumnsFunc(requestedColumns: Seq[Expression],
+      relation: CarbonDatasourceHadoopRelation,
+      needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = {
+    val map = relation.carbonRelation.metaData.dictionaryMap
+    requestedColumns.map {
+      case attr: AttributeReference =>
+        if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
+          attr
+        } else {
+          val dict = map.get(attr.name)
+          if (dict.isDefined && dict.get) {
+            AttributeReference(attr.name,
+              IntegerType,
+              attr.nullable,
+              attr.metadata)(attr.exprId, attr.qualifier)
+          } else {
+            attr
+          }
+        }
+      case alias @ Alias(attr: AttributeReference, name) =>
+        if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
+          alias
+        } else {
+          val dict = map.get(attr.name)
+          if (dict.isDefined && dict.get) {
+            alias.transform {
+              case attrLocal: AttributeReference =>
+                AttributeReference(attr.name,
+                  IntegerType,
+                  attr.nullable,
+                  attr.metadata)(attr.exprId, attr.qualifier)
+            }
+          } else {
+            alias
+          }
+        }
+      case others => others
+    }
+  }
+
+  private def getPartitioning(carbonTable: CarbonTable,
+      output: Seq[Attribute]): Partitioning = {
+    val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+    if (info != null) {
+      val cols = info.getListOfColumns.asScala
+      val sortColumn = carbonTable.
+        getDimensionByTableName(carbonTable.getFactTableName).get(0).getColName
+      val numBuckets = info.getNumberOfBuckets
+      val bucketColumns = cols.flatMap { n =>
+        val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
+        attrRef match {
+          case Some(attr: AttributeReference) =>
+            Some(AttributeReference(attr.name,
+              CarbonScalaUtil.convertCarbonToSparkDataType(n.getDataType),
+              attr.nullable,
+              attr.metadata)(attr.exprId, attr.qualifier))
+          case _ => None
+        }
+      }
+      if (bucketColumns.size == cols.size) {
+        HashPartitioning(bucketColumns, numBuckets)
+      } else {
+        UnknownPartitioning(0)
+      }
+    } else {
+      UnknownPartitioning(0)
+    }
+  }
+
+  protected[sql] def selectFilters(
+      relation: BaseRelation,
+      predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
+
+    // For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are
+    // called `predicate`s, while all data source filters of type `sources.Filter` are simply called
+    // `filter`s.
+
+    val translated: Seq[(Expression, Filter)] =
+      for {
+        predicate <- predicates
+        filter <- translateFilter(predicate)
+      } yield predicate -> filter
+
+    // A map from original Catalyst expressions to corresponding translated data source filters.
+    val translatedMap: Map[Expression, Filter] = translated.toMap
+
+    // Catalyst predicate expressions that cannot be translated to data source filters.
+    val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)
+
+    // Data source filters that cannot be handled by `relation`. The semantic of a unhandled filter
+    // at here is that a data source may not be able to apply this filter to every row
+    // of the underlying dataset.
+    val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet
+
+    val (unhandled, handled) = translated.partition {
+      case (predicate, filter) =>
+        unhandledFilters.contains(filter)
+    }
+
+    // Catalyst predicate expressions that can be translated to data source filters, but cannot be
+    // handled by `relation`.
+    val (unhandledPredicates, _) = unhandled.unzip
+
+    // Translated data source filters that can be handled by `relation`
+    val (_, handledFilters) = handled.unzip
+
+    // translated contains all filters that have been converted to the public Filter interface.
+    // We should always push them to the data source no matter whether the data source can apply
+    // a filter to every row or not.
+    val (_, translatedFilters) = translated.unzip
+
+    (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
+  }
+
+
+  /**
+   * Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
+   *
+   * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
+   */
+  protected[sql] def translateFilter(predicate: Expression, or: Boolean = false): Option[Filter] = {
+    predicate match {
+      case or@Or(left, right) =>
+
+        val leftFilter = translateFilter(left, true)
+        val rightFilter = translateFilter(right, true)
+        if (leftFilter.isDefined && rightFilter.isDefined) {
+          Some(sources.Or(leftFilter.get, rightFilter.get))
+        } else {
+          None
+        }
+
+      case And(left, right) =>
+        val leftFilter = translateFilter(left, or)
+        val rightFilter = translateFilter(right, or)
+        if (or) {
+          if (leftFilter.isDefined && rightFilter.isDefined) {
+            (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)
+          } else {
+            None
+          }
+        } else {
+          (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)
+        }
+      case EqualTo(a: Attribute, Literal(v, t)) =>
+        Some(sources.EqualTo(a.name, v))
+      case EqualTo(l@Literal(v, t), a: Attribute) =>
+        Some(sources.EqualTo(a.name, v))
+      case c@EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case c@EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case Not(EqualTo(a: Attribute, Literal(v, t))) =>
+        Some(sources.Not(sources.EqualTo(a.name, v)))
+      case Not(EqualTo(Literal(v, t), a: Attribute)) =>
+        Some(sources.Not(sources.EqualTo(a.name, v)))
+      case c@Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case c@Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
+      case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
+      case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
+        val hSet = list.map(e => e.eval(EmptyRow))
+        Some(sources.Not(sources.In(a.name, hSet.toArray)))
+      case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+        val hSet = list.map(e => e.eval(EmptyRow))
+        Some(sources.In(a.name, hSet.toArray))
+      case c@Not(In(Cast(a: Attribute, _), list))
+        if !list.exists(!_.isInstanceOf[Literal]) =>
+        Some(CastExpr(c))
+      case c@In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
+        Some(CastExpr(c))
+      case InSet(a: Attribute, set) =>
+        Some(sources.In(a.name, set.toArray))
+      case Not(InSet(a: Attribute, set)) =>
+        Some(sources.Not(sources.In(a.name, set.toArray)))
+      case GreaterThan(a: Attribute, Literal(v, t)) =>
+        Some(sources.GreaterThan(a.name, v))
+      case GreaterThan(Literal(v, t), a: Attribute) =>
+        Some(sources.LessThan(a.name, v))
+      case c@GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case c@GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case LessThan(a: Attribute, Literal(v, t)) =>
+        Some(sources.LessThan(a.name, v))
+      case LessThan(Literal(v, t), a: Attribute) =>
+        Some(sources.GreaterThan(a.name, v))
+      case c@LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case c@LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
+        Some(sources.GreaterThanOrEqual(a.name, v))
+      case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
+        Some(sources.LessThanOrEqual(a.name, v))
+      case c@GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case c@GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
+        Some(sources.LessThanOrEqual(a.name, v))
+      case LessThanOrEqual(Literal(v, t), a: Attribute) =>
+        Some(sources.GreaterThanOrEqual(a.name, v))
+      case c@LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+        CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case StartsWith(a: Attribute, Literal(v, t)) =>
+        Some(sources.StringStartsWith(a.name, v.toString))
+      case c@EndsWith(a: Attribute, Literal(v, t)) =>
+        Some(CarbonEndsWith(c))
+      case c@Contains(a: Attribute, Literal(v, t)) =>
+        Some(CarbonContainsWith(c))
+      case others => None
+    }
+  }
+
+  def supportBatchedDataSource(sqlContext: SQLContext, cols: Seq[Attribute]): Boolean = {
+    val vectorizedReader = {
+      if (sqlContext.sparkSession.conf.contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
+        sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+      } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
+      } else {
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+      }
+    }
+    val supportCodegen =
+      sqlContext.conf.wholeStageEnabled && sqlContext.conf.wholeStageMaxNumFields >= cols.size
+    supportCodegen && vectorizedReader.toBoolean &&
+    cols.forall(_.dataType.isInstanceOf[AtomicType])
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
new file mode 100644
index 0000000..715af1d
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CarbonShowLoadsCommand, LoadTableByInsertCommand, LoadTableCommand}
+import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand
+import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand}
+import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
+
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Carbon strategies for ddl commands
+ */
+class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
+
+  def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    plan match {
+      case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(identifier)(sparkSession) =>
+        ExecutedCommandExec(
+          LoadTableCommand(
+            identifier.database,
+            identifier.table.toLowerCase,
+            path,
+            Seq(),
+            Map(),
+            isOverwrite)) :: Nil
+      case alter@AlterTableRenameCommand(oldTableIdentifier, newTableIdentifier, _) =>
+        val dbOption = oldTableIdentifier.database.map(_.toLowerCase)
+        val tableIdentifier = TableIdentifier(oldTableIdentifier.table.toLowerCase(), dbOption)
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(tableIdentifier)(
+            sparkSession)
+        if (isCarbonTable) {
+          val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
+          ExecutedCommandExec(AlterTableRenameTableCommand(renameModel)) :: Nil
+        } else {
+          ExecutedCommandExec(alter) :: Nil
+        }
+      case DropTableCommand(identifier, ifNotExists, isView, _)
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .isTablePathExists(identifier)(sparkSession) =>
+        ExecutedCommandExec(
+          CarbonDropTableCommand(ifNotExists, identifier.database,
+            identifier.table.toLowerCase)) :: Nil
+      case ShowLoadsCommand(databaseName, table, limit) =>
+        ExecutedCommandExec(
+          CarbonShowLoadsCommand(
+            databaseName,
+            table.toLowerCase,
+            limit,
+            plan.output)) :: Nil
+      case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation,
+      _, child: LogicalPlan, overwrite, _) =>
+        ExecutedCommandExec(LoadTableByInsertCommand(relation, child, overwrite.enabled)) :: Nil
+      case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
+        CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath)
+        ExecutedCommandExec(createDb) :: Nil
+      case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
+        ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
+      case alterTable@AlterTableCompactionCommand(altertablemodel) =>
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(TableIdentifier(altertablemodel.tableName,
+            altertablemodel.dbName))(sparkSession)
+        if (isCarbonTable) {
+          if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
+              altertablemodel.compactionType.equalsIgnoreCase("major")) {
+            ExecutedCommandExec(alterTable) :: Nil
+          } else {
+            throw new MalformedCarbonCommandException(
+              "Unsupported alter operation on carbon table")
+          }
+        } else {
+          throw new MalformedCarbonCommandException(
+            "Operation not allowed : " + altertablemodel.alterSql)
+        }
+      case dataTypeChange@AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
+            alterTableChangeDataTypeModel.databaseName))(sparkSession)
+        if (isCarbonTable) {
+          ExecutedCommandExec(dataTypeChange) :: Nil
+        } else {
+          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+        }
+      case addColumn@AlterTableAddColumnCommand(alterTableAddColumnsModel) =>
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
+            alterTableAddColumnsModel.databaseName))(sparkSession)
+        if (isCarbonTable) {
+          ExecutedCommandExec(addColumn) :: Nil
+        } else {
+          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+        }
+      case dropColumn@AlterTableDropColumnCommand(alterTableDropColumnModel) =>
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
+            alterTableDropColumnModel.databaseName))(sparkSession)
+        if (isCarbonTable) {
+          ExecutedCommandExec(dropColumn) :: Nil
+        } else {
+          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+        }
+      case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(identifier)(sparkSession) && isFormatted =>
+        val resolvedTable =
+          sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
+        val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
+        ExecutedCommandExec(
+          CarbonDescribeFormattedCommand(
+            resultPlan,
+            plan.output,
+            identifier)) :: Nil
+      case ShowPartitionsCommand(t, cols) =>
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(t)(sparkSession)
+        if (isCarbonTable) {
+          ExecutedCommandExec(ShowCarbonPartitionsCommand(t)) :: Nil
+        } else {
+          ExecutedCommandExec(ShowPartitionsCommand(t, cols)) :: Nil
+        }
+      case set@SetCommand(kv) =>
+        ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
+      case reset@ResetCommand =>
+        ExecutedCommandExec(CarbonResetCommand()) :: Nil
+      case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, None)
+        if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
+           && tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") =>
+        val updatedCatalog =
+          CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession)
+        val cmd =
+          CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
+        ExecutedCommandExec(cmd) :: Nil
+      case _ => Nil
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index c9fc46c..f61ab84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -20,12 +20,12 @@ package org.apache.spark.sql.hive
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.{ProjectExec, SparkSqlParser, SubqueryExec}
-import org.apache.spark.sql.execution.command.ProjectForDeleteCommand
+import org.apache.spark.sql.execution.SparkSqlParser
+import org.apache.spark.sql.execution.command.mutation.ProjectForDeleteCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -167,13 +167,13 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
       updatedSelectPlan
     }
     val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
-    val tidSeq = Seq(getDB.getDatabaseName(tid.database, sparkSession))
+    val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession))
     val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias)
     ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
   }
 
   def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = {
-   val tidSeq = Seq(getDB.getDatabaseName(table.tableIdentifier.database, sparkSession),
+   val tidSeq = Seq(GetDB.getDatabaseName(table.tableIdentifier.database, sparkSession),
      table.tableIdentifier.table)
     var addedTupleId = false
     val parsePlan = parser.parsePlan(selectStmt)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 1fe6c83..478b178 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -25,16 +25,13 @@ import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubqu
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.DDLStrategy
-import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
-import org.apache.carbondata.processing.merger.TableMeta
-
 /**
  * This class will have carbon catalog and refresh the relation from cache if the carbontable in
  * carbon catalog is not same as cached carbon relation's carbon table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index f435fa6..1d8bb8a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -26,7 +27,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
   extends RunnableCommand {
 
-  override val output = command.output
+  override val output: Seq[Attribute] = command.output
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val dbName = command.databaseName
@@ -50,7 +51,7 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
 case class CarbonSetCommand(command: SetCommand)
   extends RunnableCommand {
 
-  override val output = command.output
+  override val output: Seq[Attribute] = command.output
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val sessionParms = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index c6dd905..2ddde7a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.{ProjectForUpdateCommand, RunnableCommand}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.command.mutation.ProjectForUpdateCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.{IntegerType, StringType}