You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/09/27 06:34:09 UTC

[6/6] carbondata git commit: [CARBONDATA-1151] Refactor all carbon command to separate file in spark2 integration

[CARBONDATA-1151] Refactor all carbon command to separate file in spark2 integration

This closes #1379


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6627cac0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6627cac0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6627cac0

Branch: refs/heads/master
Commit: 6627cac0ccdfa8cc94cda188f211112e82ec3294
Parents: 2ec69f6
Author: Jacky Li <ja...@qq.com>
Authored: Sun Sep 24 23:43:20 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Sep 27 14:32:20 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/CarbonCatalystOperators.scala     |   16 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |    4 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |    4 +-
 .../org/apache/spark/sql/CarbonSource.scala     |    2 +-
 .../spark/sql/SparkUnknownExpression.scala      |    1 -
 .../execution/CarbonLateDecodeStrategy.scala    |  553 --------
 .../execution/command/AlterTableCommands.scala  |  460 ------
 .../command/CarbonCreateTableCommand.scala      |  107 ++
 .../CarbonDescribeFormattedCommand.scala        |  133 ++
 .../command/CarbonDropTableCommand.scala        |   94 ++
 .../sql/execution/command/DDLStrategy.scala     |  143 --
 .../sql/execution/command/IUDCommands.scala     |  857 ------------
 .../execution/command/carbonTableSchema.scala   | 1315 ------------------
 .../AlterTableCompactionCommand.scala           |   90 ++
 .../management/CarbonShowLoadsCommand.scala     |   50 +
 .../command/management/CleanFilesCommand.scala  |   58 +
 .../management/DeleteLoadByIdCommand.scala      |   48 +
 .../DeleteLoadByLoadDateCommand.scala           |   50 +
 .../management/LoadTableByInsertCommand.scala   |   53 +
 .../command/management/LoadTableCommand.scala   |  520 +++++++
 .../command/mutation/DeleteExecution.scala      |  322 +++++
 .../command/mutation/HorizontalCompaction.scala |  246 ++++
 .../HorizontalCompactionException.scala         |   24 +
 .../mutation/ProjectForDeleteCommand.scala      |  105 ++
 .../mutation/ProjectForUpdateCommand.scala      |  228 +++
 .../spark/sql/execution/command/package.scala   |   53 +
 .../AlterTableDropCarbonPartitionCommand.scala  |  176 +++
 .../AlterTableSplitCarbonPartitionCommand.scala |  187 +++
 .../partition/ShowCarbonPartitionsCommand.scala |   60 +
 .../schema/AlterTableAddColumnCommand.scala     |  115 ++
 .../AlterTableDataTypeChangeCommand.scala       |  116 ++
 .../schema/AlterTableDropColumnCommand.scala    |  148 ++
 .../schema/AlterTableRenameTableCommand.scala   |  174 +++
 .../strategy/CarbonLateDecodeStrategy.scala     |  554 ++++++++
 .../sql/execution/strategy/DDLStrategy.scala    |  162 +++
 .../spark/sql/hive/CarbonAnalysisRules.scala    |   10 +-
 .../spark/sql/hive/CarbonSessionState.scala     |    7 +-
 .../execution/command/CarbonHiveCommands.scala  |    5 +-
 .../sql/optimizer/CarbonLateDecodeRule.scala    |    3 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   41 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |    4 +-
 .../org/apache/spark/util/Compaction.scala      |    5 +-
 .../org/apache/spark/util/TableLoader.scala     |    4 +-
 .../vectorreader/VectorReaderTestCase.scala     |    2 +-
 44 files changed, 3928 insertions(+), 3381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 0c3414a..6815629 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -65,7 +65,7 @@ case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attr
 
 case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
 
-object getDB {
+object GetDB {
 
   def getDatabaseName(dbName: Option[String], sparkSession: SparkSession): String = {
     dbName.getOrElse(
@@ -76,7 +76,10 @@ object getDB {
 /**
  * Shows Loads in a table
  */
-case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit: Option[String])
+case class ShowLoadsCommand(
+    databaseNameOp: Option[String],
+    table: String,
+    limit: Option[String])
   extends Command {
 
   override def output: Seq[Attribute] = {
@@ -111,15 +114,6 @@ case class DeleteRecords(
 }
 
 /**
- * Describe formatted for hive table
- */
-case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier) extends Command {
-
-  override def output: Seq[AttributeReference] =
-    Seq(AttributeReference("result", StringType, nullable = false)())
-}
-
-/**
  * A logical plan representing insertion into Hive table
  * This plan ignores nullability of ArrayType, MapType, StructType unlike InsertIntoTable
  * because Hive Table doesn't have nullability for ARRAY, MAP,STRUCT types.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index e9f2758..a12d86b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.execution.command.management.LoadTableCommand
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -130,7 +130,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
    */
   private def loadDataFrame(options: CarbonOption): Unit = {
     val header = dataFrame.columns.mkString(",")
-    LoadTable(
+    LoadTableCommand(
       Some(options.dbName),
       options.tableName,
       null,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 4f4515d..d5adc2f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.CarbonInputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.command.LoadTableByInsert
+import org.apache.spark.sql.execution.command.management.LoadTableByInsertCommand
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
@@ -95,7 +95,7 @@ case class CarbonDatasourceHadoopRelation(
           CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS)
     }
     if (data.logicalPlan.output.size >= carbonRelation.output.size) {
-      LoadTableByInsert(this, data.logicalPlan, overwrite).run(sparkSession)
+      LoadTableByInsertCommand(this, data.logicalPlan, overwrite).run(sparkSession)
     } else {
       sys.error("Cannot insert into target table because column number are different")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 1b021b0..f4f8b75 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -24,8 +24,8 @@ import org.apache.commons.lang.StringUtils
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
 import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
 import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index 50d7dba..d536746 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -21,7 +21,6 @@ import java.util.{ArrayList, List}
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.CarbonBoundReference
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericInternalRow}
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
deleted file mode 100644
index 4d8e7ac..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ /dev/null
@@ -1,553 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
-import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
-import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.BucketingInfo
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * Carbon specific optimization for late decode (convert dictionary key to value as late as
- * possible), which can improve the aggregation performance and reduce memory usage
- */
-private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
-  val PUSHED_FILTERS = "PushedFilters"
-
-  def apply(plan: LogicalPlan): Seq[SparkPlan] = {
-    plan match {
-      case PhysicalOperation(projects, filters, l: LogicalRelation)
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-        pruneFilterProject(
-          l,
-          projects,
-          filters,
-          (a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan(
-            a.map(_.name).toArray, f), needDecoder)) :: Nil
-      case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
-        if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) ||
-            !CarbonDictionaryDecoder.
-              isRequiredToDecode(CarbonDictionaryDecoder.
-                getDictionaryColumnMapping(child.output, relations, profile, aliasMap))) {
-          planLater(child) :: Nil
-        } else {
-          CarbonDictionaryDecoder(relations,
-            profile,
-            aliasMap,
-            planLater(child),
-            SparkSession.getActiveSession.get
-          ) :: Nil
-        }
-      case _ => Nil
-    }
-  }
-
-
-  def getDecoderRDD(
-      logicalRelation: LogicalRelation,
-      projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
-      rdd: RDD[InternalRow],
-      output: Seq[Attribute]): RDD[InternalRow] = {
-    val table = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-    val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
-      logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
-    val attrs = projectExprsNeedToDecode.map { attr =>
-      val newAttr = AttributeReference(attr.name,
-        attr.dataType,
-        attr.nullable,
-        attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName))
-      relation.addAttribute(newAttr)
-      newAttr
-    }
-
-    new CarbonDecoderRDD(
-      Seq(relation),
-      IncludeProfile(attrs),
-      CarbonAliasDecoderRelation(),
-      rdd,
-      output,
-      CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath,
-      table.carbonTable.getTableInfo.serialize())
-  }
-
-  private[this] def toCatalystRDD(
-      relation: LogicalRelation,
-      output: Seq[Attribute],
-      rdd: RDD[InternalRow],
-      needDecode: ArrayBuffer[AttributeReference]):
-  RDD[InternalRow] = {
-    if (needDecode.nonEmpty) {
-      rdd.asInstanceOf[CarbonScanRDD].setVectorReaderSupport(false)
-      getDecoderRDD(relation, needDecode, rdd, output)
-    } else {
-      rdd.asInstanceOf[CarbonScanRDD]
-        .setVectorReaderSupport(supportBatchedDataSource(relation.relation.sqlContext, output))
-      rdd
-    }
-  }
-
-  protected def pruneFilterProject(
-      relation: LogicalRelation,
-      projects: Seq[NamedExpression],
-      filterPredicates: Seq[Expression],
-      scanBuilder: (Seq[Attribute], Array[Filter],
-        ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
-    pruneFilterProjectRaw(
-      relation,
-      projects,
-      filterPredicates,
-      (requestedColumns, _, pushedFilters, a) => {
-        scanBuilder(requestedColumns, pushedFilters.toArray, a)
-      })
-  }
-
-  protected def pruneFilterProjectRaw(
-      relation: LogicalRelation,
-      rawProjects: Seq[NamedExpression],
-      filterPredicates: Seq[Expression],
-      scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
-        ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
-    val projects = rawProjects.map {p =>
-      p.transform {
-        case CustomDeterministicExpression(exp) => exp
-      }
-    }.asInstanceOf[Seq[NamedExpression]]
-
-    val projectSet = AttributeSet(projects.flatMap(_.references))
-    val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
-
-    val candidatePredicates = filterPredicates.map {
-      _ transform {
-        case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
-      }
-    }
-
-    val (unhandledPredicates, pushedFilters) =
-      selectFilters(relation.relation, candidatePredicates)
-
-    // A set of column attributes that are only referenced by pushed down filters.  We can eliminate
-    // them from requested columns.
-    val handledSet = {
-      val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
-      val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
-      AttributeSet(handledPredicates.flatMap(_.references)) --
-      (projectSet ++ unhandledSet).map(relation.attributeMap)
-    }
-
-    // Combines all Catalyst filter `Expression`s that are either not convertible to data source
-    // `Filter`s or cannot be handled by `relation`.
-    val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
-    val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-    val map = table.carbonRelation.metaData.dictionaryMap
-
-    val metadata: Map[String, String] = {
-      val pairs = ArrayBuffer.empty[(String, String)]
-
-      if (pushedFilters.nonEmpty) {
-        pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
-      }
-      pairs.toMap
-    }
-
-
-    val needDecoder = ArrayBuffer[AttributeReference]()
-    filterCondition match {
-      case Some(exp: Expression) =>
-        exp.references.collect {
-          case attr: AttributeReference =>
-            val dict = map.get(attr.name)
-            if (dict.isDefined && dict.get) {
-              needDecoder += attr
-            }
-        }
-      case None =>
-    }
-
-    projects.map {
-      case attr: AttributeReference =>
-      case Alias(attr: AttributeReference, _) =>
-      case others =>
-        others.references.map { f =>
-          val dictionary = map.get(f.name)
-          if (dictionary.isDefined && dictionary.get) {
-            needDecoder += f.asInstanceOf[AttributeReference]
-          }
-        }
-    }
-
-    if (projects.map(_.toAttribute) == projects &&
-        projectSet.size == projects.size &&
-        filterSet.subsetOf(projectSet)) {
-      // When it is possible to just use column pruning to get the right projection and
-      // when the columns of this projection are enough to evaluate all filter conditions,
-      // just do a scan followed by a filter, with no extra project.
-      val requestedColumns = projects
-        // Safe due to if above.
-        .asInstanceOf[Seq[Attribute]]
-        // Match original case of attributes.
-        .map(relation.attributeMap)
-        // Don't request columns that are only referenced by pushed filters.
-        .filterNot(handledSet.contains)
-      val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
-
-      val updateProject = projects.map { expr =>
-        var attr = expr.toAttribute.asInstanceOf[AttributeReference]
-        if (!needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
-          val dict = map.get(attr.name)
-          if (dict.isDefined && dict.get) {
-            attr = AttributeReference(attr.name, IntegerType, attr.nullable, attr.metadata)(attr
-              .exprId, attr.qualifier)
-          }
-        }
-        attr
-      }
-      val scan = getDataSourceScan(relation,
-        updateProject,
-        scanBuilder,
-        candidatePredicates,
-        pushedFilters,
-        metadata,
-        needDecoder,
-        updateRequestedColumns.asInstanceOf[Seq[Attribute]])
-      filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
-    } else {
-
-      var newProjectList: Seq[Attribute] = Seq.empty
-      val updatedProjects = projects.map {
-          case a@Alias(s: ScalaUDF, name)
-            if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
-                name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
-            val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId)
-            newProjectList :+= reference
-            reference
-          case a@Alias(s: ScalaUDF, name)
-            if name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) =>
-            val reference =
-              AttributeReference(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
-                StringType, true)().withExprId(a.exprId)
-            newProjectList :+= reference
-            a.transform {
-              case s: ScalaUDF =>
-                ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
-            }
-          case other => other
-      }
-      // Don't request columns that are only referenced by pushed filters.
-      val requestedColumns =
-        (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
-      val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
-      val scan = getDataSourceScan(relation,
-        updateRequestedColumns.asInstanceOf[Seq[Attribute]],
-        scanBuilder,
-        candidatePredicates,
-        pushedFilters,
-        metadata,
-        needDecoder,
-        updateRequestedColumns.asInstanceOf[Seq[Attribute]])
-      execution.ProjectExec(
-        updateRequestedColumnsFunc(updatedProjects, table,
-          needDecoder).asInstanceOf[Seq[NamedExpression]],
-        filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
-    }
-  }
-
-  def getDataSourceScan(relation: LogicalRelation,
-      output: Seq[Attribute],
-      scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
-        ArrayBuffer[AttributeReference]) => RDD[InternalRow],
-      candidatePredicates: Seq[Expression],
-      pushedFilters: Seq[Filter],
-      metadata: Map[String, String],
-      needDecoder: ArrayBuffer[AttributeReference],
-      updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = {
-    val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-    if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) &&
-        needDecoder.isEmpty) {
-      BatchedDataSourceScanExec(
-        output,
-        scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
-        relation.relation,
-        getPartitioning(table.carbonTable, updateRequestedColumns),
-        metadata,
-        relation.catalogTable.map(_.identifier))
-    } else {
-      RowDataSourceScanExec(output,
-        scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
-        relation.relation,
-        getPartitioning(table.carbonTable, updateRequestedColumns),
-        metadata,
-        relation.catalogTable.map(_.identifier))
-    }
-  }
-
-  def updateRequestedColumnsFunc(requestedColumns: Seq[Expression],
-      relation: CarbonDatasourceHadoopRelation,
-      needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = {
-    val map = relation.carbonRelation.metaData.dictionaryMap
-    requestedColumns.map {
-      case attr: AttributeReference =>
-        if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
-          attr
-        } else {
-          val dict = map.get(attr.name)
-          if (dict.isDefined && dict.get) {
-            AttributeReference(attr.name,
-              IntegerType,
-              attr.nullable,
-              attr.metadata)(attr.exprId, attr.qualifier)
-          } else {
-            attr
-          }
-        }
-      case alias @ Alias(attr: AttributeReference, name) =>
-        if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
-          alias
-        } else {
-          val dict = map.get(attr.name)
-          if (dict.isDefined && dict.get) {
-            alias.transform {
-              case attrLocal: AttributeReference =>
-                AttributeReference(attr.name,
-                  IntegerType,
-                  attr.nullable,
-                  attr.metadata)(attr.exprId, attr.qualifier)
-            }
-          } else {
-            alias
-          }
-        }
-      case others => others
-    }
-  }
-
-  private def getPartitioning(carbonTable: CarbonTable,
-      output: Seq[Attribute]): Partitioning = {
-    val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
-    if (info != null) {
-      val cols = info.getListOfColumns.asScala
-      val sortColumn = carbonTable.
-        getDimensionByTableName(carbonTable.getFactTableName).get(0).getColName
-      val numBuckets = info.getNumberOfBuckets
-      val bucketColumns = cols.flatMap { n =>
-        val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
-        attrRef match {
-          case Some(attr: AttributeReference) =>
-            Some(AttributeReference(attr.name,
-              CarbonScalaUtil.convertCarbonToSparkDataType(n.getDataType),
-              attr.nullable,
-              attr.metadata)(attr.exprId, attr.qualifier))
-          case _ => None
-        }
-      }
-      if (bucketColumns.size == cols.size) {
-        HashPartitioning(bucketColumns, numBuckets)
-      } else {
-        UnknownPartitioning(0)
-      }
-    } else {
-      UnknownPartitioning(0)
-    }
-  }
-
-  protected[sql] def selectFilters(
-      relation: BaseRelation,
-      predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
-
-    // For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are
-    // called `predicate`s, while all data source filters of type `sources.Filter` are simply called
-    // `filter`s.
-
-    val translated: Seq[(Expression, Filter)] =
-      for {
-        predicate <- predicates
-        filter <- translateFilter(predicate)
-      } yield predicate -> filter
-
-    // A map from original Catalyst expressions to corresponding translated data source filters.
-    val translatedMap: Map[Expression, Filter] = translated.toMap
-
-    // Catalyst predicate expressions that cannot be translated to data source filters.
-    val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)
-
-    // Data source filters that cannot be handled by `relation`. The semantic of a unhandled filter
-    // at here is that a data source may not be able to apply this filter to every row
-    // of the underlying dataset.
-    val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet
-
-    val (unhandled, handled) = translated.partition {
-      case (predicate, filter) =>
-        unhandledFilters.contains(filter)
-    }
-
-    // Catalyst predicate expressions that can be translated to data source filters, but cannot be
-    // handled by `relation`.
-    val (unhandledPredicates, _) = unhandled.unzip
-
-    // Translated data source filters that can be handled by `relation`
-    val (_, handledFilters) = handled.unzip
-
-    // translated contains all filters that have been converted to the public Filter interface.
-    // We should always push them to the data source no matter whether the data source can apply
-    // a filter to every row or not.
-    val (_, translatedFilters) = translated.unzip
-
-    (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
-  }
-
-
-  /**
-   * Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
-   *
-   * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
-   */
-  protected[sql] def translateFilter(predicate: Expression, or: Boolean = false): Option[Filter] = {
-    predicate match {
-      case or@Or(left, right) =>
-
-        val leftFilter = translateFilter(left, true)
-        val rightFilter = translateFilter(right, true)
-        if (leftFilter.isDefined && rightFilter.isDefined) {
-          Some(sources.Or(leftFilter.get, rightFilter.get))
-        } else {
-          None
-        }
-
-      case And(left, right) =>
-        val leftFilter = translateFilter(left, or)
-        val rightFilter = translateFilter(right, or)
-        if (or) {
-          if (leftFilter.isDefined && rightFilter.isDefined) {
-            (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)
-          } else {
-            None
-          }
-        } else {
-          (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)
-        }
-      case EqualTo(a: Attribute, Literal(v, t)) =>
-        Some(sources.EqualTo(a.name, v))
-      case EqualTo(l@Literal(v, t), a: Attribute) =>
-        Some(sources.EqualTo(a.name, v))
-      case c@EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case c@EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case Not(EqualTo(a: Attribute, Literal(v, t))) =>
-        Some(sources.Not(sources.EqualTo(a.name, v)))
-      case Not(EqualTo(Literal(v, t), a: Attribute)) =>
-        Some(sources.Not(sources.EqualTo(a.name, v)))
-      case c@Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case c@Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
-      case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
-      case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
-        val hSet = list.map(e => e.eval(EmptyRow))
-        Some(sources.Not(sources.In(a.name, hSet.toArray)))
-      case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
-        val hSet = list.map(e => e.eval(EmptyRow))
-        Some(sources.In(a.name, hSet.toArray))
-      case c@Not(In(Cast(a: Attribute, _), list))
-        if !list.exists(!_.isInstanceOf[Literal]) =>
-        Some(CastExpr(c))
-      case c@In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
-        Some(CastExpr(c))
-      case InSet(a: Attribute, set) =>
-        Some(sources.In(a.name, set.toArray))
-      case Not(InSet(a: Attribute, set)) =>
-        Some(sources.Not(sources.In(a.name, set.toArray)))
-      case GreaterThan(a: Attribute, Literal(v, t)) =>
-        Some(sources.GreaterThan(a.name, v))
-      case GreaterThan(Literal(v, t), a: Attribute) =>
-        Some(sources.LessThan(a.name, v))
-      case c@GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case c@GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case LessThan(a: Attribute, Literal(v, t)) =>
-        Some(sources.LessThan(a.name, v))
-      case LessThan(Literal(v, t), a: Attribute) =>
-        Some(sources.GreaterThan(a.name, v))
-      case c@LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case c@LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
-        Some(sources.GreaterThanOrEqual(a.name, v))
-      case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
-        Some(sources.LessThanOrEqual(a.name, v))
-      case c@GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case c@GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
-        Some(sources.LessThanOrEqual(a.name, v))
-      case LessThanOrEqual(Literal(v, t), a: Attribute) =>
-        Some(sources.GreaterThanOrEqual(a.name, v))
-      case c@LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
-        CastExpressionOptimization.checkIfCastCanBeRemove(c)
-      case StartsWith(a: Attribute, Literal(v, t)) =>
-        Some(sources.StringStartsWith(a.name, v.toString))
-      case c@EndsWith(a: Attribute, Literal(v, t)) =>
-        Some(CarbonEndsWith(c))
-      case c@Contains(a: Attribute, Literal(v, t)) =>
-        Some(CarbonContainsWith(c))
-      case others => None
-    }
-  }
-
-  def supportBatchedDataSource(sqlContext: SQLContext, cols: Seq[Attribute]): Boolean = {
-    val vectorizedReader = {
-      if (sqlContext.sparkSession.conf.contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
-        sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
-      } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
-        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
-      } else {
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
-          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
-      }
-    }
-    val supportCodegen =
-      sqlContext.conf.wholeStageEnabled && sqlContext.conf.wholeStageMaxNumFields >= cols.size
-    supportCodegen && vectorizedReader.toBoolean &&
-    cols.forall(_.dataType.isInstanceOf[AtomicType])
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
deleted file mode 100644
index 17e456d..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-import scala.language.implicitConversions
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
-
-private[sql] case class AlterTableAddColumns(
-    alterTableAddColumnsModel: AlterTableAddColumnsModel) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sparkSession: SparkSession): Seq[Row] = {
-    val tableName = alterTableAddColumnsModel.tableName
-    val dbName = alterTableAddColumnsModel.databaseName
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
-    var locks = List.empty[ICarbonLock]
-    var timeStamp = 0L
-    var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
-    var carbonTable: CarbonTable = null
-    try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
-      // Consider a concurrent scenario where 2 alter operations are executed in parallel. 1st
-      // operation is success and updates the schema file. 2nd operation will get the lock after
-      // completion of 1st operation but as look up relation is called before it will have the
-      // older carbon table and this can lead to inconsistent state in the system. Therefor look
-      // up relation should be called after acquiring the lock
-      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
-      // get the latest carbon table and check for column existence
-      // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
-      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-      val schemaConverter = new ThriftWrapperSchemaConverterImpl()
-      val wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(thriftTableInfo,
-          dbName,
-          tableName,
-          carbonTable.getStorePath)
-      newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
-        dbName,
-        wrapperTableInfo,
-        carbonTablePath,
-        carbonTable.getCarbonTableIdentifier,
-        carbonTable.getStorePath, sparkSession.sparkContext).process
-      // generate dictionary files for the newly added columns
-      new AlterTableAddColumnRDD(sparkSession.sparkContext,
-        newCols,
-        carbonTable.getCarbonTableIdentifier,
-        carbonTable.getStorePath).collect()
-      timeStamp = System.currentTimeMillis
-      val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
-      schemaEvolutionEntry.setTimeStamp(timeStamp)
-      schemaEvolutionEntry.setAdded(newCols.toList.asJava)
-      val thriftTable = schemaConverter
-        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
-      AlterTableUtil
-        .updateSchemaInfo(carbonTable,
-          schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
-          thriftTable)(sparkSession,
-          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
-      LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
-      LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
-    } catch {
-      case e: Exception =>
-        LOGGER.error(e, "Alter table add columns failed")
-        if (newCols.nonEmpty) {
-          LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
-          new AlterTableDropColumnRDD(sparkSession.sparkContext,
-            newCols,
-            carbonTable.getCarbonTableIdentifier,
-            carbonTable.getStorePath).collect()
-          AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
-        }
-        sys.error(s"Alter table add operation failed: ${e.getMessage}")
-    } finally {
-      // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks)
-    }
-    Seq.empty
-  }
-}
-
-private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableRenameModel)
-  extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sparkSession: SparkSession): Seq[Row] = {
-    val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
-    val newTableIdentifier = alterTableRenameModel.newTableIdentifier
-    val oldDatabaseName = oldTableIdentifier.database
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    val newDatabaseName = newTableIdentifier.database
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
-      throw new MalformedCarbonCommandException("Database name should be same for both tables")
-    }
-    val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
-    if (tableExists) {
-      throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
-                                                s"already exists")
-    }
-    val oldTableName = oldTableIdentifier.table.toLowerCase
-    val newTableName = newTableIdentifier.table.toLowerCase
-    LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
-    LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
-    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val relation: CarbonRelation =
-      metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
-        .asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      LOGGER.audit(s"Rename table request has failed. " +
-                   s"Table $oldDatabaseName.$oldTableName does not exist")
-      sys.error(s"Table $oldDatabaseName.$oldTableName does not exist")
-    }
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
-      LockUsage.COMPACTION_LOCK,
-      LockUsage.DELETE_SEGMENT_LOCK,
-      LockUsage.CLEAN_FILES_LOCK,
-      LockUsage.DROP_TABLE_LOCK)
-    var locks = List.empty[ICarbonLock]
-    var timeStamp = 0L
-    var carbonTable: CarbonTable = null
-    try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
-            sparkSession)
-      val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
-        .asInstanceOf[CarbonRelation].tableMeta
-      carbonTable = tableMeta.carbonTable
-      // get the latest carbon table and check for column existence
-      val carbonTablePath = CarbonStorePath.
-        getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
-      val tableMetadataFile = carbonTablePath.getPath
-      val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
-      schemaEvolutionEntry.setTableName(newTableName)
-      timeStamp = System.currentTimeMillis()
-      schemaEvolutionEntry.setTime_stamp(timeStamp)
-      renameBadRecords(oldTableName, newTableName, oldDatabaseName)
-      val fileType = FileFactory.getFileType(tableMetadataFile)
-      if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-        val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
-          .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
-                       newTableName)
-        if (!rename) {
-          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
-          sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
-        }
-      }
-      val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
-        newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
-      val newTablePath = metastore.updateTableSchema(newTableIdentifier,
-          carbonTable.getCarbonTableIdentifier,
-          tableInfo,
-          schemaEvolutionEntry,
-          tableMeta.tablePath)(sparkSession)
-      metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
-      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
-        .runSqlHive(
-          s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
-      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
-        .runSqlHive(
-          s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
-          s"('tableName'='$newTableName', " +
-          s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
-      sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
-        Some(oldDatabaseName)).quotedString)
-      LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
-      LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
-    } catch {
-      case e: Exception =>
-        LOGGER.error(e, "Rename table failed: " + e.getMessage)
-        if (carbonTable != null) {
-          AlterTableUtil
-            .revertRenameTableChanges(oldTableIdentifier,
-              newTableName,
-              carbonTable.getStorePath,
-              carbonTable.getCarbonTableIdentifier.getTableId,
-              timeStamp)(
-              sparkSession)
-          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
-        }
-        sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
-    } finally {
-      // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks)
-      // case specific to rename table as after table rename old table path will not be found
-      if (carbonTable != null) {
-        AlterTableUtil
-          .releaseLocksManually(locks,
-            locksToBeAcquired,
-            oldDatabaseName,
-            newTableName,
-            carbonTable.getStorePath)
-      }
-    }
-    Seq.empty
-  }
-
-  private def renameBadRecords(oldTableName: String,
-      newTableName: String,
-      dataBaseName: String) = {
-    val oldPath = CarbonUtil
-      .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName)
-    val newPath = CarbonUtil
-      .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName)
-    val fileType = FileFactory.getFileType(oldPath)
-    if (FileFactory.isFileExist(oldPath, fileType)) {
-      val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType)
-        .renameForce(newPath)
-      if (!renameSuccess) {
-        sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName")
-      }
-    }
-  }
-
-}
-
-private[sql] case class AlterTableDropColumns(
-    alterTableDropColumnModel: AlterTableDropColumnModel) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sparkSession: SparkSession): Seq[Row] = {
-    val tableName = alterTableDropColumnModel.tableName
-    val dbName = alterTableDropColumnModel.databaseName
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
-    var locks = List.empty[ICarbonLock]
-    var timeStamp = 0L
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
-    // get the latest carbon table and check for column existence
-    var carbonTable: CarbonTable = null
-    try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
-      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
-      val partitionInfo = carbonTable.getPartitionInfo(tableName)
-      if (partitionInfo != null) {
-        val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
-          .map(_.getColumnName)
-        // check each column existence in the table
-        val partitionColumns = alterTableDropColumnModel.columns.filter {
-          tableColumn => partitionColumnSchemaList.contains(tableColumn)
-        }
-        if (partitionColumns.nonEmpty) {
-          throw new UnsupportedOperationException("Partition columns cannot be dropped: " +
-                                                  s"$partitionColumns")
-        }
-      }
-      val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
-      var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
-      .ColumnSchema]()
-      var keyColumnCountToBeDeleted = 0
-      // TODO: if deleted column list includes bucketted column throw an error
-      alterTableDropColumnModel.columns.foreach { column =>
-        var columnExist = false
-        tableColumns.foreach { tableColumn =>
-          // column should not be already deleted and should exist in the table
-          if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
-            if (tableColumn.isDimension) {
-              keyColumnCountToBeDeleted += 1
-              if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
-                dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
-              }
-            }
-            columnExist = true
-          }
-        }
-        if (!columnExist) {
-          sys.error(s"Column $column does not exists in the table $dbName.$tableName")
-        }
-      }
-      // take the total key column count. key column to be deleted should not
-      // be >= key columns in schema
-      val totalKeyColumnInSchema = tableColumns.count {
-        tableColumn => !tableColumn.isInvisible && tableColumn.isDimension
-      }
-      if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
-        sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
-      }
-      // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
-      val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-      // maintain the deleted columns for schema evolution history
-      var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
-      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
-      alterTableDropColumnModel.columns.foreach { column =>
-        columnSchemaList.foreach { columnSchema =>
-          if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
-            deletedColumnSchema += columnSchema.deepCopy
-            columnSchema.invisible = true
-          }
-        }
-      }
-      // add deleted columns to schema evolution history and update the schema
-      timeStamp = System.currentTimeMillis
-      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
-      schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
-      AlterTableUtil
-        .updateSchemaInfo(carbonTable,
-          schemaEvolutionEntry,
-          tableInfo)(sparkSession,
-          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
-      // TODO: 1. add check for deletion of index tables
-      // delete dictionary files for dictionary column and clear dictionary cache from memory
-      new AlterTableDropColumnRDD(sparkSession.sparkContext,
-        dictionaryColumns,
-        carbonTable.getCarbonTableIdentifier,
-        carbonTable.getStorePath).collect()
-      LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
-      LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
-    } catch {
-      case e: Exception => LOGGER
-        .error("Alter table drop columns failed : " + e.getMessage)
-        if (carbonTable != null) {
-          AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
-        }
-        sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
-    } finally {
-      // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks)
-    }
-    Seq.empty
-  }
-}
-
-private[sql] case class AlterTableDataTypeChange(
-    alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sparkSession: SparkSession): Seq[Row] = {
-    val tableName = alterTableDataTypeChangeModel.tableName
-    val dbName = alterTableDataTypeChangeModel.databaseName
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
-    var locks = List.empty[ICarbonLock]
-    // get the latest carbon table and check for column existence
-    var carbonTable: CarbonTable = null
-    var timeStamp = 0L
-    try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
-      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
-      val columnName = alterTableDataTypeChangeModel.columnName
-      val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
-      if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
-        LOGGER.audit(s"Alter table change data type request has failed. " +
-                     s"Column $columnName does not exist")
-        sys.error(s"Column does not exist: $columnName")
-      }
-      val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
-      if (carbonColumn.size == 1) {
-        CarbonScalaUtil
-          .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn(0))
-      } else {
-        LOGGER.audit(s"Alter table change data type request has failed. " +
-                     s"Column $columnName is invalid")
-        sys.error(s"Invalid Column: $columnName")
-      }
-      // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
-      val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-      // maintain the added column for schema evolution history
-      var addColumnSchema: ColumnSchema = null
-      var deletedColumnSchema: ColumnSchema = null
-      val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
-      columnSchemaList.foreach { columnSchema =>
-        if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
-          deletedColumnSchema = columnSchema.deepCopy
-          columnSchema.setData_type(DataTypeConverterUtil
-            .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
-          columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
-          columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
-          addColumnSchema = columnSchema
-        }
-      }
-      timeStamp = System.currentTimeMillis
-      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
-      schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
-      schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
-      tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
-        .setTime_stamp(System.currentTimeMillis)
-      AlterTableUtil
-        .updateSchemaInfo(carbonTable,
-          schemaEvolutionEntry,
-          tableInfo)(sparkSession,
-          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
-      LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
-      LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
-    } catch {
-      case e: Exception => LOGGER
-        .error("Alter table change datatype failed : " + e.getMessage)
-        if (carbonTable != null) {
-          AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
-        }
-        sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
-    } finally {
-      // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks)
-    }
-    Seq.empty
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
new file mode 100644
index 0000000..f5c6cba
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.InvalidConfigurationException
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.util.CarbonUtil
+
+case class CarbonCreateTableCommand(
+    cm: TableModel,
+    createDSTable: Boolean = true)
+  extends RunnableCommand with SchemaProcessCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
+    CarbonEnv.getInstance(sparkSession).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(storePath)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
+    val tbName = cm.tableName
+    val dbName = cm.databaseName
+    LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+
+    val tableInfo: TableInfo = TableNewProcessor(cm)
+
+    // Add validation for sort scope when create table
+    val sortScope = tableInfo.getFactTable.getTableProperties
+      .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+    if (!CarbonUtil.isValidSortOption(sortScope)) {
+      throw new InvalidConfigurationException(
+        s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," +
+        s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
+    }
+
+    if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
+      sys.error("No Dimensions found. Table should have at least one dimesnion !")
+    }
+
+    if (sparkSession.sessionState.catalog.listTables(dbName)
+      .exists(_.table.equalsIgnoreCase(tbName))) {
+      if (!cm.ifNotExistsSet) {
+        LOGGER.audit(
+          s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
+          s"Table [$tbName] already exists under database [$dbName]")
+        sys.error(s"Table [$tbName] already exists under database [$dbName]")
+      }
+    } else {
+      val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
+      // Add Database to catalog and persist
+      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      val tablePath = tableIdentifier.getTablePath
+      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
+      if (createDSTable) {
+        try {
+          val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
+          cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
+          cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
+
+          sparkSession.sql(
+            s"""CREATE TABLE $dbName.$tbName
+               |(${ fields.map(f => f.rawSchema).mkString(",") })
+               |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
+            s""""$tablePath"$carbonSchemaString) """)
+        } catch {
+          case e: Exception =>
+            val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
+            // call the drop table to delete the created table.
+            CarbonEnv.getInstance(sparkSession).carbonMetastore
+              .dropTable(tablePath, identifier)(sparkSession)
+
+            LOGGER.audit(s"Table creation with Database name [$dbName] " +
+                         s"and Table name [$tbName] failed")
+            throw e
+        }
+      }
+
+      LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
new file mode 100644
index 0000000..e5f6b75
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.hive.CarbonRelation
+import org.codehaus.jackson.map.ObjectMapper
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+
+private[sql] case class CarbonDescribeFormattedCommand(
+    child: SparkPlan,
+    override val output: Seq[Attribute],
+    tblIdentifier: TableIdentifier)
+  extends RunnableCommand with SchemaProcessCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+  }
+
+  private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
+    var results: Seq[(String, String, String)] =
+      Seq(("", "", ""), ("##Column Group Information", "", ""))
+    val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
+      case (groupId, _) => groupId != -1
+    }.toSeq.sortBy(_._1)
+    val groups = groupedDimensions.map(colGroups => {
+      colGroups._2.map(dim => dim.getColName).mkString(", ")
+    })
+    var index = 1
+    groups.foreach { x =>
+      results = results :+ (s"Column Group $index", x, "")
+      index = index + 1
+    }
+    results
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+    val mapper = new ObjectMapper()
+    val colProps = StringBuilder.newBuilder
+    val dims = relation.metaData.dims.map(x => x.toLowerCase)
+    var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
+      val fieldName = field.name.toLowerCase
+      val comment = if (dims.contains(fieldName)) {
+        val dimension = relation.metaData.carbonTable.getDimensionByName(
+          relation.tableMeta.carbonTableIdentifier.getTableName, fieldName)
+        if (null != dimension.getColumnProperties && !dimension.getColumnProperties.isEmpty) {
+          colProps.append(fieldName).append(".")
+            .append(mapper.writeValueAsString(dimension.getColumnProperties))
+            .append(",")
+        }
+        if (dimension.hasEncoding(Encoding.DICTIONARY) &&
+            !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          "DICTIONARY, KEY COLUMN" + (if (dimension.hasEncoding(Encoding.INVERTED_INDEX)) {
+            ""
+          } else {
+            ",NOINVERTEDINDEX"
+          })
+        } else {
+          "KEY COLUMN" + (if (dimension.hasEncoding(Encoding.INVERTED_INDEX)) {
+            ""
+          } else {
+            ",NOINVERTEDINDEX"
+          })
+        }
+      } else {
+        "MEASURE"
+      }
+      (field.name, field.dataType.simpleString, comment)
+    }
+    val colPropStr = if (colProps.toString().trim().length() > 0) {
+      // drops additional comma at end
+      colProps.toString().dropRight(1)
+    } else {
+      colProps.toString()
+    }
+    results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
+    results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
+      .getDatabaseName, "")
+    )
+    results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
+    results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
+    val carbonTable = relation.tableMeta.carbonTable
+    results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
+    results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable
+      .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
+      .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
+    if (colPropStr.length() > 0) {
+      results ++= Seq((colPropStr, "", ""))
+    } else {
+      results ++= Seq(("ADAPTIVE", "", ""))
+    }
+    results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
+      relation.tableMeta.carbonTableIdentifier.getTableName).asScala
+      .map(column => column).mkString(","), ""))
+    val dimension = carbonTable
+      .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+    results ++= getColumnGroups(dimension.asScala.toList)
+    if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+      results ++=
+      Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+        .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), ""))
+    }
+    results.map { case (name, dataType, comment) =>
+      Row(f"$name%-36s", f"$dataType%-80s", f"$comment%-72s")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
new file mode 100644
index 0000000..5f70771
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonStorePath
+
+case class CarbonDropTableCommand(
+    ifExistsSet: Boolean,
+    databaseNameOp: Option[String],
+    tableName: String)
+  extends RunnableCommand with SchemaProcessCommand with DataProcessCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+    processData(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
+    val carbonEnv = CarbonEnv.getInstance(sparkSession)
+    val catalog = carbonEnv.carbonMetastore
+    val tableIdentifier =
+      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
+        dbName.toLowerCase, tableName.toLowerCase)
+    catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
+    val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
+    try {
+      locksToBeAcquired foreach {
+        lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
+      }
+      LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
+
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
+      LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
+        sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}")
+    } finally {
+      if (carbonLocks.nonEmpty) {
+        val unlocked = carbonLocks.forall(_.unlock())
+        if (unlocked) {
+          logInfo("Table MetaData Unlocked Successfully")
+        }
+      }
+    }
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    // delete the table folder
+    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
+    val tableIdentifier =
+      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
+    val metadataFilePath =
+      CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath
+    val fileType = FileFactory.getFileType(metadataFilePath)
+    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
deleted file mode 100644
index c8c716e..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.command
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
-
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-/**
- * Carbon strategies for ddl commands
- */
-class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
-
-  def apply(plan: LogicalPlan): Seq[SparkPlan] = {
-    plan match {
-      case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
-        if CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .tableExists(identifier)(sparkSession) =>
-        ExecutedCommandExec(LoadTable(identifier.database, identifier.table.toLowerCase, path,
-          Seq(), Map(), isOverwrite)) :: Nil
-      case alter@AlterTableRenameCommand(oldTableIdentifier, newTableIdentifier, _) =>
-        val dbOption = oldTableIdentifier.database.map(_.toLowerCase)
-        val tableIdentifier = TableIdentifier(oldTableIdentifier.table.toLowerCase(), dbOption)
-        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .tableExists(tableIdentifier)(
-            sparkSession)
-        if (isCarbonTable) {
-          val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
-          ExecutedCommandExec(AlterTableRenameTable(renameModel)) :: Nil
-        } else {
-          ExecutedCommandExec(alter) :: Nil
-        }
-      case DropTableCommand(identifier, ifNotExists, isView, _)
-        if CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .isTablePathExists(identifier)(sparkSession) =>
-        ExecutedCommandExec(
-          CarbonDropTableCommand(ifNotExists, identifier.database,
-            identifier.table.toLowerCase)) :: Nil
-      case ShowLoadsCommand(databaseName, table, limit) =>
-        ExecutedCommandExec(ShowLoads(databaseName, table.toLowerCase, limit, plan.output)) :: Nil
-      case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation,
-      _, child: LogicalPlan, overwrite, _) =>
-        ExecutedCommandExec(LoadTableByInsert(relation, child, overwrite.enabled)) :: Nil
-      case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
-        CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath)
-        ExecutedCommandExec(createDb) :: Nil
-      case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
-        ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
-      case alterTable@AlterTableCompaction(altertablemodel) =>
-        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .tableExists(TableIdentifier(altertablemodel.tableName,
-            altertablemodel.dbName))(sparkSession)
-        if (isCarbonTable) {
-          if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
-              altertablemodel.compactionType.equalsIgnoreCase("major")) {
-            ExecutedCommandExec(alterTable) :: Nil
-          } else {
-            throw new MalformedCarbonCommandException(
-              "Unsupported alter operation on carbon table")
-          }
-        } else {
-          throw new MalformedCarbonCommandException(
-            "Operation not allowed : " + altertablemodel.alterSql)
-        }
-      case dataTypeChange@AlterTableDataTypeChange(alterTableChangeDataTypeModel) =>
-        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
-            alterTableChangeDataTypeModel.databaseName))(sparkSession)
-        if (isCarbonTable) {
-          ExecutedCommandExec(dataTypeChange) :: Nil
-        } else {
-          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
-        }
-      case addColumn@AlterTableAddColumns(alterTableAddColumnsModel) =>
-        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
-            alterTableAddColumnsModel.databaseName))(sparkSession)
-        if (isCarbonTable) {
-          ExecutedCommandExec(addColumn) :: Nil
-        } else {
-          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
-        }
-      case dropColumn@AlterTableDropColumns(alterTableDropColumnModel) =>
-        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
-            alterTableDropColumnModel.databaseName))(sparkSession)
-        if (isCarbonTable) {
-          ExecutedCommandExec(dropColumn) :: Nil
-        } else {
-          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
-        }
-      case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
-        if CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .tableExists(identifier)(sparkSession) && isFormatted =>
-        val resolvedTable =
-          sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
-        val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
-        ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
-      case ShowPartitionsCommand(t, cols) =>
-        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .tableExists(t)(sparkSession)
-        if (isCarbonTable) {
-          ExecutedCommandExec(ShowCarbonPartitionsCommand(t)) :: Nil
-        } else {
-          ExecutedCommandExec(ShowPartitionsCommand(t, cols)) :: Nil
-        }
-      case set@SetCommand(kv) =>
-        ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
-      case reset@ResetCommand =>
-        ExecutedCommandExec(CarbonResetCommand()) :: Nil
-      case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, None)
-        if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
-           && tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") =>
-        val updatedCatalog =
-          CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession)
-        val cmd =
-          CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
-        ExecutedCommandExec(cmd) :: Nil
-      case _ => Nil
-    }
-  }
-
-}