You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/27 01:19:08 UTC

[1/4] incubator-carbondata git commit: Initial commit

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 28190eb71 -> e8dcd4296


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
new file mode 100644
index 0000000..9a3f828
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.parser
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.catalog.CatalogColumn
+import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
+import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ColTypeListContext, CreateTableContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{CreateTable, Field, TableModel}
+import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Concrete parser for Spark SQL statements and carbon specific statements
+ */
+class CarbonSparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
+
+  val astBuilder = new CarbonSqlAstBuilder(conf)
+
+  private val substitutor = new VariableSubstitution(conf)
+
+  protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
+    super.parse(substitutor.substitute(command))(toResult)
+  }
+
+  override def parsePlan(sqlText: String): LogicalPlan = {
+    try {
+      super.parsePlan(sqlText)
+    } catch {
+      case e: Throwable =>
+        astBuilder.parser.parse(sqlText)
+    }
+  }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
+
+  val parser = new CarbonSpark2SqlParser
+
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+    val fileStorage = Option(ctx.createFileFormat) match {
+      case Some(value) => value.storageHandler().STRING().getSymbol.getText
+      case _ => ""
+    }
+    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+      val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
+      // TODO: implement temporary tables
+      if (temp) {
+        throw new ParseException(
+          "CREATE TEMPORARY TABLE is not supported yet. " +
+          "Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
+      }
+      if (ctx.skewSpec != null) {
+        operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
+      }
+      if (ctx.bucketSpec != null) {
+        operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
+      }
+      val comment = Option(ctx.STRING).map(string)
+      val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
+      val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+      val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
+        .getOrElse(Map.empty)
+
+      // Ensuring whether no duplicate name is used in table definition
+      val colNames = cols.map(_.name)
+      if (colNames.length != colNames.distinct.length) {
+        val duplicateColumns = colNames.groupBy(identity).collect {
+          case (x, ys) if ys.length > 1 => "\"" + x + "\""
+        }
+        operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
+                            duplicateColumns.mkString("[", ",", "]"), ctx)
+      }
+
+      // For Hive tables, partition columns must not be part of the schema
+      val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
+      if (badPartCols.nonEmpty) {
+        operationNotAllowed(s"Partition columns may not be specified in the schema: " +
+                            badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+      }
+
+      // Note: Hive requires partition columns to be distinct from the schema, so we need
+      // to include the partition columns here explicitly
+      val schema = cols ++ partitionCols
+
+      val fields = schema.map { col =>
+        val x = col.name + ' ' + col.dataType
+        val f: Field = parser.anyFieldDef(new parser.lexical.Scanner(x))
+        match {
+          case parser.Success(field, _) => field.asInstanceOf[Field]
+          case failureOrError => throw new MalformedCarbonCommandException(
+            s"Unsupported data type: $col.getType")
+        }
+        // the data type of the decimal type will be like decimal(10,0)
+        // so checking the start of the string and taking the precision and scale.
+        // resetting the data type with decimal
+        if (f.dataType.getOrElse("").startsWith("decimal")) {
+          val (precision, scale) = parser.getScaleAndPrecision(col.dataType)
+          f.precision = precision
+          f.scale = scale
+          f.dataType = Some("decimal")
+        }
+        if(f.dataType.getOrElse("").startsWith("char")) {
+          f.dataType = Some("char")
+        }
+        f.rawSchema = x
+        f
+      }
+
+      // validate tblProperties
+      if (!CommonUtil.validateTblProperties(properties.asJava.asScala, fields)) {
+        throw new MalformedCarbonCommandException("Invalid table properties")
+      }
+      // prepare table model of the collected tokens
+      val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
+        name.database,
+        name.table,
+        fields,
+        Seq(),
+        properties.asJava.asScala)
+
+      CreateTable(tableModel)
+    } else {
+      super.visitCreateTable(ctx)
+    }
+  }
+
+  /**
+   * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
+   */
+  private def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
+    val props = visitTablePropertyList(ctx)
+    val badKeys = props.filter { case (_, v) => v == null }.keys
+    if (badKeys.nonEmpty) {
+      operationNotAllowed(
+        s"Values must be specified for key(s): ${ badKeys.mkString("[", ",", "]") }", ctx)
+    }
+    props
+  }
+
+  private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = {
+    withOrigin(ctx) {
+      ctx.colType.asScala.map { col =>
+        CatalogColumn(
+          col.identifier.getText.toLowerCase,
+          // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
+          // just convert the whole type string to lower case, otherwise the struct field names
+          // will no longer be case sensitive. Instead, we rely on our parser to get the proper
+          // case before passing it to Hive.
+          typedVisit[DataType](col.dataType).catalogString,
+          nullable = true,
+          Option(col.STRING).map(string))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index c84882e..399b3e6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -43,7 +43,7 @@ object CleanFiles {
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     cleanFiles(spark, dbName, tableName, storePath)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 1e891fd..2db6e48 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -41,7 +41,7 @@ object Compaction {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val compactionType = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     compaction(spark, dbName, tableName, compactionType)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index ae95bf6..951cd7f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -43,7 +43,7 @@ object DeleteSegmentByDate {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val dateValue = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     deleteSegmentByDate(spark, dbName, tableName, dateValue)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index d5a6861..dad9f59 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -48,7 +48,7 @@ object DeleteSegmentById {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     deleteSegmentById(spark, dbName, tableName, segmentIds)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index 1a02c8c..c953089 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -75,7 +75,7 @@ object ShowSegments {
       None
     }
     val spark = TableAPIUtil.spark(storePath, s"ShowSegments: $dbName.$tableName")
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     val rows = showSegments(spark, dbName, tableName, limit)
     System.out.println(showString(rows))
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 8b10aa4..424d8fa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -86,7 +86,7 @@ object TableLoader {
 
     val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
 
-    CarbonEnv.init(spark.sqlContext)
+    CarbonEnv.init(spark)
     loadTable(spark, Option(dbName), tableName, inputPaths, map)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 45dcb03..4310d04 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -54,6 +54,11 @@ class QueryTest extends PlanTest {
       clean(metastoredb)
     }
 
+    CarbonProperties.getInstance()
+      .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+      .addProperty("carbon.storelocation", storeLocation)
+
+    import org.apache.spark.sql.CarbonSession._
     val spark = SparkSession
         .builder()
         .master("local")
@@ -62,17 +67,13 @@ class QueryTest extends PlanTest {
         .config("spark.sql.warehouse.dir", warehouse)
         .config("javax.jdo.option.ConnectionURL",
           s"jdbc:derby:;databaseName=$metastoredb;create=true")
-        .getOrCreate()
-
-    CarbonProperties.getInstance()
-        .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
-        .addProperty("carbon.storelocation", storeLocation)
+        .getOrCreateCarbonSession()
 
     spark.sparkContext.setLogLevel("WARN")
     spark
   }
 
-  val sc = spark.sparkContext
+  val Dsc = spark.sparkContext
 
   lazy val implicits = spark.implicits
 


[4/4] incubator-carbondata git commit: [CARBONDATA-547] Added CarbonSession and enabled parser to use carbon custom commands This closes #448

Posted by ja...@apache.org.
[CARBONDATA-547] Added CarbonSession and enabled parser to use carbon custom commands This closes #448


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e8dcd429
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e8dcd429
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e8dcd429

Branch: refs/heads/master
Commit: e8dcd4296b3eec453f100290afe39489edbd151f
Parents: 28190eb bedc96d
Author: jackylk <ja...@huawei.com>
Authored: Tue Dec 27 09:17:42 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Dec 27 09:17:42 2016 +0800

----------------------------------------------------------------------
 .../carbondata/examples/CarbonExample.scala     | 173 ----
 .../examples/CarbonSessionExample.scala         | 144 +++
 .../examples/SparkSessionExample.scala          | 173 ++++
 .../catalyst/AbstractCarbonSparkSQLParser.scala | 137 +++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 968 +++++++++++++++++++
 .../execution/command/carbonTableSchema.scala   |   8 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  | 938 +-----------------
 .../execution/command/carbonTableSchema.scala   |   2 +-
 .../spark/sql/CarbonCatalystOperators.scala     |  36 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   4 +-
 .../org/apache/spark/sql/CarbonSession.scala    | 133 +++
 .../org/apache/spark/sql/CarbonSource.scala     |  12 +-
 .../execution/CarbonLateDecodeStrategy.scala    |   6 +-
 .../sql/execution/command/DDLStrategy.scala     |  83 ++
 .../execution/command/carbonTableSchema.scala   | 319 +++++-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   9 +
 .../spark/sql/hive/CarbonSessionState.scala     |  38 +
 .../sql/parser/CarbonSpark2SqlParser.scala      | 125 +++
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 178 ++++
 .../org/apache/spark/util/CleanFiles.scala      |   2 +-
 .../org/apache/spark/util/Compaction.scala      |   2 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   2 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   2 +-
 .../org/apache/spark/util/ShowSegments.scala    |   2 +-
 .../org/apache/spark/util/TableLoader.scala     |   2 +-
 .../spark/carbondata/util/QueryTest.scala       |   0
 .../sql/common/util/CarbonSessionTest.scala     |   0
 .../spark/sql/common/util/QueryTest.scala       |  13 +-
 28 files changed, 2366 insertions(+), 1145 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-carbondata git commit: Initial commit

Posted by ja...@apache.org.
Initial commit

Added comments

Fixed style and testcases

Refactored code

Fixed issue

Rebased

Rebased

fixed comments

fixed style


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bedc96d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bedc96d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bedc96d0

Branch: refs/heads/master
Commit: bedc96d059fe25ebec298220b92243e87c496a0c
Parents: 28190eb
Author: ravipesala <ra...@gmail.com>
Authored: Mon Dec 19 18:57:11 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Tue Dec 27 08:48:59 2016 +0800

----------------------------------------------------------------------
 .../carbondata/examples/CarbonExample.scala     | 173 ----
 .../examples/CarbonSessionExample.scala         | 144 +++
 .../examples/SparkSessionExample.scala          | 173 ++++
 .../catalyst/AbstractCarbonSparkSQLParser.scala | 137 +++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 968 +++++++++++++++++++
 .../execution/command/carbonTableSchema.scala   |   8 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  | 938 +-----------------
 .../execution/command/carbonTableSchema.scala   |   2 +-
 .../spark/sql/CarbonCatalystOperators.scala     |  36 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   4 +-
 .../org/apache/spark/sql/CarbonSession.scala    | 133 +++
 .../org/apache/spark/sql/CarbonSource.scala     |  12 +-
 .../execution/CarbonLateDecodeStrategy.scala    |   6 +-
 .../sql/execution/command/DDLStrategy.scala     |  83 ++
 .../execution/command/carbonTableSchema.scala   | 319 +++++-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   9 +
 .../spark/sql/hive/CarbonSessionState.scala     |  38 +
 .../sql/parser/CarbonSpark2SqlParser.scala      | 125 +++
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 178 ++++
 .../org/apache/spark/util/CleanFiles.scala      |   2 +-
 .../org/apache/spark/util/Compaction.scala      |   2 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   2 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   2 +-
 .../org/apache/spark/util/ShowSegments.scala    |   2 +-
 .../org/apache/spark/util/TableLoader.scala     |   2 +-
 .../spark/carbondata/util/QueryTest.scala       |   0
 .../sql/common/util/CarbonSessionTest.scala     |   0
 .../spark/sql/common/util/QueryTest.scala       |  13 +-
 28 files changed, 2366 insertions(+), 1145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
deleted file mode 100644
index 273de95..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.examples
-
-import java.io.File
-
-import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.{CleanFiles, ShowSegments}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-object CarbonExample {
-
-  def main(args: Array[String]): Unit = {
-    val rootPath = new File(this.getClass.getResource("/").getPath
-        + "../../../..").getCanonicalPath
-    val storeLocation = s"$rootPath/examples/spark2/target/store"
-    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
-    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
-
-    // clean data folder
-    if (true) {
-      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
-      clean(storeLocation)
-      clean(warehouse)
-      clean(metastoredb)
-    }
-
-    val spark = SparkSession
-        .builder()
-        .master("local")
-        .appName("CarbonExample")
-        .enableHiveSupport()
-        .config("spark.sql.warehouse.dir", warehouse)
-        .config("javax.jdo.option.ConnectionURL",
-          s"jdbc:derby:;databaseName=$metastoredb;create=true")
-        .getOrCreate()
-
-    CarbonProperties.getInstance()
-      .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
-      .addProperty("carbon.storelocation", storeLocation)
-
-    spark.sparkContext.setLogLevel("WARN")
-
-    CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-
-    // Create table
-    spark.sql(
-      s"""
-         | CREATE TABLE carbon_table(
-         |    shortField short,
-         |    intField int,
-         |    bigintField long,
-         |    doubleField double,
-         |    stringField string,
-         |    timestampField timestamp,
-         |    decimalField decimal(18,2),
-         |    dateField date,
-         |    charField char(5)
-         | )
-         | USING org.apache.spark.sql.CarbonSource
-         | OPTIONS('DICTIONARY_INCLUDE'='dateField, charField',
-         |   'dbName'='default', 'tableName'='carbon_table')
-       """.stripMargin)
-
-    // val prop = s"$rootPath/conf/dataload.properties.template"
-    // val tableName = "carbon_table"
-    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
-    // TableLoader.main(Array[String](prop, tableName, path))
-
-    spark.sql(
-      s"""
-         | CREATE TABLE csv_table
-         | (  shortField short,
-         |    intField int,
-         |    bigintField long,
-         |    doubleField double,
-         |    stringField string,
-         |    timestampField string,
-         |    decimalField decimal(18,2),
-         |    dateField string,
-         |    charField char(5))
-         |    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
-       """.stripMargin)
-
-    spark.sql(
-      s"""
-         | LOAD DATA LOCAL INPATH '$path'
-         | INTO TABLE csv_table
-       """.stripMargin)
-
-    spark.sql("""
-             SELECT *
-             FROM csv_table
-              """).show
-
-    spark.sql(
-      s"""
-         | INSERT INTO TABLE carbon_table
-         | SELECT shortField, intField, bigintField, doubleField, stringField,
-         | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField,
-         | cast(to_date(from_unixtime(unix_timestamp(dateField,'yyyy/M/dd'))) as date), charField
-         | FROM csv_table
-       """.stripMargin)
-
-    spark.sql("""
-             SELECT *
-             FROM carbon_table
-             where stringfield = 'spark' and decimalField > 40
-              """).show
-
-    spark.sql("""
-             SELECT *
-             FROM carbon_table where length(stringField) = 5
-              """).show
-
-    spark.sql("""
-             SELECT *
-             FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
-              """).show
-
-    spark.sql("""
-             select count(stringField) from carbon_table
-              """.stripMargin).show
-
-    spark.sql("""
-           SELECT sum(intField), stringField
-           FROM carbon_table
-           GROUP BY stringField
-           """).show
-
-    spark.sql(
-      """
-        |select t1.*, t2.*
-        |from carbon_table t1, carbon_table t2
-        |where t1.stringField = t2.stringField
-      """.stripMargin).show
-
-    spark.sql(
-      """
-        |with t1 as (
-        |select * from carbon_table
-        |union all
-        |select * from carbon_table
-        |)
-        |select t1.*, t2.*
-        |from t1, carbon_table t2
-        |where t1.stringField = t2.stringField
-      """.stripMargin).show
-
-    // Drop table
-    spark.sql("DROP TABLE IF EXISTS carbon_table")
-    spark.sql("DROP TABLE IF EXISTS csv_table")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
new file mode 100644
index 0000000..4923e5b
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object CarbonSessionExample {
+
+  def main(args: Array[String]) {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+    // clean data folder
+    if (true) {
+      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+      clean(storeLocation)
+      clean(warehouse)
+      clean(metastoredb)
+    }
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+      .addProperty("carbon.storelocation", storeLocation)
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    import org.apache.spark.sql.CarbonSession._
+
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("CarbonExample")
+      .enableHiveSupport()
+      .config("spark.sql.warehouse.dir", warehouse)
+      .config("javax.jdo.option.ConnectionURL",
+    s"jdbc:derby:;databaseName=$metastoredb;create=true")
+      .getOrCreateCarbonSession()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+
+    // Create table
+    spark.sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         |    shortField short,
+         |    intField int,
+         |    bigintField long,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField timestamp,
+         |    decimalField decimal(18,2),
+         |    dateField date,
+         |    charField char(5)
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+
+    // scalastyle:off
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbon_table
+         | options('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField')
+       """.stripMargin)
+    // scalastyle:on
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table
+             where stringfield = 'spark' and decimalField > 40
+              """).show
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table where length(stringField) = 5
+              """).show
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
+              """).show
+
+    spark.sql("""
+             select count(stringField) from carbon_table
+              """.stripMargin).show
+
+    spark.sql("""
+           SELECT sum(intField), stringField
+           FROM carbon_table
+           GROUP BY stringField
+              """).show
+
+    spark.sql(
+      """
+        |select t1.*, t2.*
+        |from carbon_table t1, carbon_table t2
+        |where t1.stringField = t2.stringField
+      """.stripMargin).show
+
+    spark.sql(
+      """
+        |with t1 as (
+        |select * from carbon_table
+        |union all
+        |select * from carbon_table
+        |)
+        |select t1.*, t2.*
+        |from t1, carbon_table t2
+        |where t1.stringField = t2.stringField
+      """.stripMargin).show
+
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
new file mode 100644
index 0000000..2affbe2
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{CleanFiles, ShowSegments}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object SparkSessionExample {
+
+  def main(args: Array[String]): Unit = {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+        + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+    // clean data folder
+    if (true) {
+      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+      clean(storeLocation)
+      clean(warehouse)
+      clean(metastoredb)
+    }
+
+    val spark = SparkSession
+        .builder()
+        .master("local")
+        .appName("CarbonExample")
+        .enableHiveSupport()
+        .config("spark.sql.warehouse.dir", warehouse)
+        .config("javax.jdo.option.ConnectionURL",
+          s"jdbc:derby:;databaseName=$metastoredb;create=true")
+        .getOrCreate()
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+      .addProperty("carbon.storelocation", storeLocation)
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    // Create table
+    spark.sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         |    shortField short,
+         |    intField int,
+         |    bigintField long,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField timestamp,
+         |    decimalField decimal(18,2),
+         |    dateField date,
+         |    charField char(5)
+         | )
+         | USING org.apache.spark.sql.CarbonSource
+         | OPTIONS('DICTIONARY_INCLUDE'='dateField, charField',
+         |   'dbName'='default', 'tableName'='carbon_table')
+       """.stripMargin)
+
+    // val prop = s"$rootPath/conf/dataload.properties.template"
+    // val tableName = "carbon_table"
+    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+    // TableLoader.main(Array[String](prop, tableName, path))
+
+    spark.sql(
+      s"""
+         | CREATE TABLE csv_table
+         | (  shortField short,
+         |    intField int,
+         |    bigintField long,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField string,
+         |    decimalField decimal(18,2),
+         |    dateField string,
+         |    charField char(5))
+         |    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+       """.stripMargin)
+
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE csv_table
+       """.stripMargin)
+
+    spark.sql("""
+             SELECT *
+             FROM csv_table
+              """).show
+
+    spark.sql(
+      s"""
+         | INSERT INTO TABLE carbon_table
+         | SELECT shortField, intField, bigintField, doubleField, stringField,
+         | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField,
+         | cast(to_date(from_unixtime(unix_timestamp(dateField,'yyyy/M/dd'))) as date), charField
+         | FROM csv_table
+       """.stripMargin)
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table
+             where stringfield = 'spark' and decimalField > 40
+              """).show
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table where length(stringField) = 5
+              """).show
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
+              """).show
+
+    spark.sql("""
+             select count(stringField) from carbon_table
+              """.stripMargin).show
+
+    spark.sql("""
+           SELECT sum(intField), stringField
+           FROM carbon_table
+           GROUP BY stringField
+           """).show
+
+    spark.sql(
+      """
+        |select t1.*, t2.*
+        |from carbon_table t1, carbon_table t2
+        |where t1.stringField = t2.stringField
+      """.stripMargin).show
+
+    spark.sql(
+      """
+        |with t1 as (
+        |select * from carbon_table
+        |union all
+        |select * from carbon_table
+        |)
+        |select t1.*, t2.*
+        |from t1, carbon_table t2
+        |where t1.stringField = t2.stringField
+      """.stripMargin).show
+
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+    spark.sql("DROP TABLE IF EXISTS csv_table")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala
new file mode 100644
index 0000000..fba7976
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.lexical.StdLexical
+import scala.util.parsing.combinator.syntactical.StandardTokenParsers
+import scala.util.parsing.combinator.PackratParsers
+import scala.util.parsing.input.CharArrayReader.EofCh
+
+import org.apache.spark.sql.catalyst.plans.logical._
+
+private[sql] abstract class AbstractCarbonSparkSQLParser
+  extends StandardTokenParsers with PackratParsers {
+
+  def parse(input: String): LogicalPlan = synchronized {
+    // Initialize the Keywords.
+    initLexical
+    phrase(start)(new lexical.Scanner(input)) match {
+      case Success(plan, _) => plan
+      case failureOrError => sys.error(failureOrError.toString)
+    }
+  }
+  /* One time initialization of lexical.This avoid reinitialization of  lexical in parse method */
+  protected lazy val initLexical: Unit = lexical.initialize(reservedWords)
+
+  protected case class Keyword(str: String) {
+    def normalize: String = lexical.normalizeKeyword(str)
+    def parser: Parser[String] = normalize
+  }
+
+  protected implicit def asParser(k: Keyword): Parser[String] = k.parser
+
+  // By default, use Reflection to find the reserved words defined in the sub class.
+  // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this
+  // method during the parent class instantiation, because the sub class instance
+  // isn't created yet.
+  protected lazy val reservedWords: Seq[String] =
+    this
+      .getClass
+      .getMethods
+      .filter(_.getReturnType == classOf[Keyword])
+      .map(_.invoke(this).asInstanceOf[Keyword].normalize)
+
+  // Set the keywords as empty by default, will change that later.
+  override val lexical = new SqlLexical
+
+  protected def start: Parser[LogicalPlan]
+
+  // Returns the whole input string
+  protected lazy val wholeInput: Parser[String] = new Parser[String] {
+    def apply(in: Input): ParseResult[String] =
+      Success(in.source.toString, in.drop(in.source.length()))
+  }
+
+  // Returns the rest of the input string that are not parsed yet
+  protected lazy val restInput: Parser[String] = new Parser[String] {
+    def apply(in: Input): ParseResult[String] =
+      Success(
+        in.source.subSequence(in.offset, in.source.length()).toString,
+        in.drop(in.source.length()))
+  }
+}
+
+class SqlLexical extends StdLexical {
+  case class FloatLit(chars: String) extends Token {
+    override def toString: String = chars
+  }
+
+  /* This is a work around to support the lazy setting */
+  def initialize(keywords: Seq[String]): Unit = {
+    reserved.clear()
+    reserved ++= keywords
+  }
+
+  /* Normal the keyword string */
+  def normalizeKeyword(str: String): String = str.toLowerCase
+
+  delimiters += (
+    "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
+    ",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
+    )
+
+  protected override def processIdent(name: String) = {
+    val token = normalizeKeyword(name)
+    if (reserved contains token) Keyword(token) else Identifier(name)
+  }
+
+  override lazy val token: Parser[Token] =
+    ( identChar ~ (identChar | digit).* ^^
+      { case first ~ rest => processIdent((first :: rest).mkString) }
+      | digit.* ~ identChar ~ (identChar | digit).* ^^
+        { case first ~ middle ~ rest => processIdent((first ++ (middle :: rest)).mkString) }
+      | rep1(digit) ~ ('.' ~> digit.*).? ^^ {
+      case i ~ None => NumericLit(i.mkString)
+      case i ~ Some(d) => FloatLit(i.mkString + "." + d.mkString)
+    }
+      | '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^
+        { case chars => StringLit(chars mkString "") }
+      | '"' ~> chrExcept('"', '\n', EofCh).* <~ '"' ^^
+        { case chars => StringLit(chars mkString "") }
+      | '`' ~> chrExcept('`', '\n', EofCh).* <~ '`' ^^
+        { case chars => Identifier(chars mkString "") }
+      | EofCh ^^^ EOF
+      | '\'' ~> failure("unclosed string literal")
+      | '"' ~> failure("unclosed string literal")
+      | delim
+      | failure("illegal character")
+      )
+
+  override def identChar: Parser[Elem] = letter | elem('_')
+
+  override def whitespace: Parser[Any] =
+    ( whitespaceChar
+      | '/' ~ '*' ~ comment
+      | '/' ~ '/' ~ chrExcept(EofCh, '\n').*
+      | '#' ~ chrExcept(EofCh, '\n').*
+      | '-' ~ '-' ~ chrExcept(EofCh, '\n').*
+      | '/' ~ '*' ~ failure("unclosed comment")
+      ).*
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
new file mode 100644
index 0000000..a5088df
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -0,0 +1,968 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import java.util.regex.{Matcher, Pattern}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{LinkedHashSet, Map}
+import scala.language.implicitConversions
+import scala.util.matching.Regex
+
+import org.apache.hadoop.hive.ql.lib.Node
+import org.apache.hadoop.hive.ql.parse._
+import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.execution.command._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * TODO remove the duplicate code and add the common methods to common class.
+ * Parser for All Carbon DDL cases
+ */
+abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  protected val AGGREGATE = carbonKeyWord("AGGREGATE")
+  protected val AS = carbonKeyWord("AS")
+  protected val AGGREGATION = carbonKeyWord("AGGREGATION")
+  protected val ALL = carbonKeyWord("ALL")
+  protected val HIGH_CARDINALITY_DIMS = carbonKeyWord("NO_DICTIONARY")
+  protected val BEFORE = carbonKeyWord("BEFORE")
+  protected val BY = carbonKeyWord("BY")
+  protected val CARDINALITY = carbonKeyWord("CARDINALITY")
+  protected val CASCADE = carbonKeyWord("CASCADE")
+  protected val CLASS = carbonKeyWord("CLASS")
+  protected val CLEAN = carbonKeyWord("CLEAN")
+  protected val COLS = carbonKeyWord("COLS")
+  protected val COLUMNS = carbonKeyWord("COLUMNS")
+  protected val COMPACT = carbonKeyWord("COMPACT")
+  protected val CREATE = carbonKeyWord("CREATE")
+  protected val CUBE = carbonKeyWord("CUBE")
+  protected val CUBES = carbonKeyWord("CUBES")
+  protected val DATA = carbonKeyWord("DATA")
+  protected val DATABASE = carbonKeyWord("DATABASE")
+  protected val DATABASES = carbonKeyWord("DATABASES")
+  protected val DELETE = carbonKeyWord("DELETE")
+  protected val DELIMITER = carbonKeyWord("DELIMITER")
+  protected val DESCRIBE = carbonKeyWord("DESCRIBE")
+  protected val DESC = carbonKeyWord("DESC")
+  protected val DETAIL = carbonKeyWord("DETAIL")
+  protected val DIMENSIONS = carbonKeyWord("DIMENSIONS")
+  protected val DIMFOLDERPATH = carbonKeyWord("DIMFOLDERPATH")
+  protected val DROP = carbonKeyWord("DROP")
+  protected val ESCAPECHAR = carbonKeyWord("ESCAPECHAR")
+  protected val EXCLUDE = carbonKeyWord("EXCLUDE")
+  protected val EXPLAIN = carbonKeyWord("EXPLAIN")
+  protected val EXTENDED = carbonKeyWord("EXTENDED")
+  protected val FORMATTED = carbonKeyWord("FORMATTED")
+  protected val FACT = carbonKeyWord("FACT")
+  protected val FIELDS = carbonKeyWord("FIELDS")
+  protected val FILEHEADER = carbonKeyWord("FILEHEADER")
+  protected val SERIALIZATION_NULL_FORMAT = carbonKeyWord("SERIALIZATION_NULL_FORMAT")
+  protected val BAD_RECORDS_LOGGER_ENABLE = carbonKeyWord("BAD_RECORDS_LOGGER_ENABLE")
+  protected val BAD_RECORDS_ACTION = carbonKeyWord("BAD_RECORDS_ACTION")
+  protected val FILES = carbonKeyWord("FILES")
+  protected val FROM = carbonKeyWord("FROM")
+  protected val HIERARCHIES = carbonKeyWord("HIERARCHIES")
+  protected val IN = carbonKeyWord("IN")
+  protected val INCLUDE = carbonKeyWord("INCLUDE")
+  protected val INPATH = carbonKeyWord("INPATH")
+  protected val INTO = carbonKeyWord("INTO")
+  protected val LEVELS = carbonKeyWord("LEVELS")
+  protected val LIKE = carbonKeyWord("LIKE")
+  protected val LOAD = carbonKeyWord("LOAD")
+  protected val LOCAL = carbonKeyWord("LOCAL")
+  protected val MAPPED = carbonKeyWord("MAPPED")
+  protected val MEASURES = carbonKeyWord("MEASURES")
+  protected val MULTILINE = carbonKeyWord("MULTILINE")
+  protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1")
+  protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2")
+  protected val OPTIONS = carbonKeyWord("OPTIONS")
+  protected val OUTPATH = carbonKeyWord("OUTPATH")
+  protected val OVERWRITE = carbonKeyWord("OVERWRITE")
+  protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT")
+  protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA")
+  protected val PARTITIONER = carbonKeyWord("PARTITIONER")
+  protected val QUOTECHAR = carbonKeyWord("QUOTECHAR")
+  protected val RELATION = carbonKeyWord("RELATION")
+  protected val SCHEMA = carbonKeyWord("SCHEMA")
+  protected val SCHEMAS = carbonKeyWord("SCHEMAS")
+  protected val SHOW = carbonKeyWord("SHOW")
+  protected val TABLES = carbonKeyWord("TABLES")
+  protected val TABLE = carbonKeyWord("TABLE")
+  protected val TERMINATED = carbonKeyWord("TERMINATED")
+  protected val TYPE = carbonKeyWord("TYPE")
+  protected val USE = carbonKeyWord("USE")
+  protected val WHERE = carbonKeyWord("WHERE")
+  protected val WITH = carbonKeyWord("WITH")
+  protected val AGGREGATETABLE = carbonKeyWord("AGGREGATETABLE")
+  protected val ABS = carbonKeyWord("abs")
+
+  protected val FOR = carbonKeyWord("FOR")
+  protected val SCRIPTS = carbonKeyWord("SCRIPTS")
+  protected val USING = carbonKeyWord("USING")
+  protected val LIMIT = carbonKeyWord("LIMIT")
+  protected val DEFAULTS = carbonKeyWord("DEFAULTS")
+  protected val ALTER = carbonKeyWord("ALTER")
+  protected val ADD = carbonKeyWord("ADD")
+
+  protected val IF = carbonKeyWord("IF")
+  protected val NOT = carbonKeyWord("NOT")
+  protected val EXISTS = carbonKeyWord("EXISTS")
+  protected val DIMENSION = carbonKeyWord("DIMENSION")
+  protected val STARTTIME = carbonKeyWord("STARTTIME")
+  protected val SEGMENTS = carbonKeyWord("SEGMENTS")
+  protected val SEGMENT = carbonKeyWord("SEGMENT")
+
+  protected val STRING = carbonKeyWord("STRING")
+  protected val INTEGER = carbonKeyWord("INTEGER")
+  protected val TIMESTAMP = carbonKeyWord("TIMESTAMP")
+  protected val DATE = carbonKeyWord("DATE")
+  protected val CHAR = carbonKeyWord("CHAR")
+  protected val NUMERIC = carbonKeyWord("NUMERIC")
+  protected val DECIMAL = carbonKeyWord("DECIMAL")
+  protected val DOUBLE = carbonKeyWord("DOUBLE")
+  protected val SHORT = carbonKeyWord("SMALLINT")
+  protected val INT = carbonKeyWord("INT")
+  protected val BIGINT = carbonKeyWord("BIGINT")
+  protected val ARRAY = carbonKeyWord("ARRAY")
+  protected val STRUCT = carbonKeyWord("STRUCT")
+
+  protected val doubleQuotedString = "\"([^\"]+)\"".r
+  protected val singleQuotedString = "'([^']+)'".r
+
+  protected val newReservedWords =
+    this.getClass
+      .getMethods
+      .filter(_.getReturnType == classOf[Keyword])
+      .map(_.invoke(this).asInstanceOf[Keyword].str)
+
+  override val lexical = {
+    val sqllex = new SqlLexical()
+    sqllex.initialize(newReservedWords)
+    sqllex
+
+  }
+
+  import lexical.Identifier
+
+  implicit def regexToParser(regex: Regex): Parser[String] = {
+    acceptMatch(
+    s"identifier matching regex ${ regex }",
+    { case Identifier(str) if regex.unapplySeq(str).isDefined => str }
+    )
+  }
+
+  /**
+   * This will convert key word to regular expression.
+   *
+   * @param keys
+   * @return
+   */
+  private def carbonKeyWord(keys: String) = {
+    ("(?i)" + keys).r
+  }
+
+  protected val escapedIdentifier = "`([^`]+)`".r
+
+  private def reorderDimensions(dims: Seq[Field]): Seq[Field] = {
+    var complexDimensions: Seq[Field] = Seq()
+    var dimensions: Seq[Field] = Seq()
+    dims.foreach { dimension =>
+      dimension.dataType.getOrElse("NIL") match {
+        case "Array" => complexDimensions = complexDimensions :+ dimension
+        case "Struct" => complexDimensions = complexDimensions :+ dimension
+        case _ => dimensions = dimensions :+ dimension
+      }
+    }
+    dimensions ++ complexDimensions
+  }
+
+
+
+  def getScaleAndPrecision(dataType: String): (Int, Int) = {
+    val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
+    m.find()
+    val matchedString: String = m.group(1)
+    val scaleAndPrecision = matchedString.split(",")
+    (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
+  }
+
+  /**
+   * This will prepate the Model from the Tree details.
+   *
+   * @param ifNotExistPresent
+   * @param dbName
+   * @param tableName
+   * @param fields
+   * @param partitionCols
+   * @param tableProperties
+   * @return
+   */
+  def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
+      , tableName: String, fields: Seq[Field],
+      partitionCols: Seq[PartitionerField],
+      tableProperties: Map[String, String]): TableModel
+  = {
+
+    fields.zipWithIndex.foreach { x =>
+      x._1.schemaOrdinal = x._2
+    }
+    val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
+      fields, tableProperties)
+    if (dims.isEmpty) {
+      throw new MalformedCarbonCommandException(s"Table ${
+        dbName.getOrElse(
+          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+      }.$tableName"
+                                                +
+                                                " can not be created without key columns. Please " +
+                                                "use DICTIONARY_INCLUDE or " +
+                                                "DICTIONARY_EXCLUDE to set at least one key " +
+                                                "column " +
+                                                "if all specified columns are numeric types")
+    }
+    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
+
+    // column properties
+    val colProps = extractColumnProperties(fields, tableProperties)
+    // get column groups configuration from table properties.
+    val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
+      noDictionaryDims, msrs, dims)
+
+    // get no inverted index columns from table properties.
+    val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
+
+    // validate the tableBlockSize from table properties
+    CommonUtil.validateTableBlockSize(tableProperties)
+
+    TableModel(
+      ifNotExistPresent,
+      dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
+      dbName,
+      tableName,
+      tableProperties,
+      reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
+      msrs.map(f => normalizeType(f)),
+      Option(noDictionaryDims),
+      Option(noInvertedIdxCols),
+      groupCols,
+      Some(colProps))
+  }
+
+  /**
+   * Extract the column groups configuration from table properties.
+   * Based on this Row groups of fields will be determined.
+   *
+   * @param tableProperties
+   * @return
+   */
+  protected def updateColumnGroupsInField(tableProperties: Map[String, String],
+      noDictionaryDims: Seq[String],
+      msrs: Seq[Field],
+      dims: Seq[Field]): Seq[String] = {
+    if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
+
+      var splittedColGrps: Seq[String] = Seq[String]()
+      val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
+
+      // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
+      // here first splitting the value by () . so that the above will be splitted into 2 strings.
+      // [col1,col2] [col3,col4]
+      val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols)
+      while (m.find()) {
+        val oneGroup: String = m.group(1)
+        CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, splittedColGrps, dims)
+        val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims)
+        splittedColGrps :+= arrangedColGrp
+      }
+      // This will  be furthur handled.
+      CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
+    } else {
+      null
+    }
+  }
+
+  def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = {
+    // if columns in column group is not in schema order than arrange it in schema order
+    var colGrpFieldIndx: Seq[Int] = Seq[Int]()
+    colGroup.split(',').map(_.trim).foreach { x =>
+      dims.zipWithIndex.foreach { dim =>
+        if (dim._1.column.equalsIgnoreCase(x)) {
+          colGrpFieldIndx :+= dim._2
+        }
+      }
+    }
+    // sort it
+    colGrpFieldIndx = colGrpFieldIndx.sorted
+    // check if columns in column group is in schema order
+    if (!checkIfInSequence(colGrpFieldIndx)) {
+      throw new MalformedCarbonCommandException("Invalid column group:" + colGroup)
+    }
+    def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = {
+      for (i <- 0 until (colGrpFieldIndx.length - 1)) {
+        if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) {
+          throw new MalformedCarbonCommandException(
+            "Invalid column group,column in group should be contiguous as per schema.")
+        }
+      }
+      true
+    }
+    val colGrpNames: StringBuilder = StringBuilder.newBuilder
+    for (i <- colGrpFieldIndx.indices) {
+      colGrpNames.append(dims(colGrpFieldIndx(i)).column)
+      if (i < (colGrpFieldIndx.length - 1)) {
+        colGrpNames.append(",")
+      }
+    }
+    colGrpNames.toString()
+  }
+
+  /**
+   * For getting the partitioner Object
+   *
+   * @param partitionCols
+   * @param tableProperties
+   * @return
+   */
+  protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
+      tableProperties: Map[String, String]):
+  Option[Partitioner] = {
+
+    // by default setting partition class empty.
+    // later in table schema it is setting to default value.
+    var partitionClass: String = ""
+    var partitionCount: Int = 1
+    var partitionColNames: Array[String] = Array[String]()
+    if (tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).isDefined) {
+      partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
+    }
+
+    if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
+      try {
+        partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
+      } catch {
+        case e: Exception => // no need to do anything.
+      }
+    }
+
+    partitionCols.foreach(col =>
+      partitionColNames :+= col.partitionColumn
+    )
+
+    // this means user has given partition cols list
+    if (!partitionColNames.isEmpty) {
+      return Option(Partitioner(partitionClass, partitionColNames, partitionCount, null))
+    }
+    // if partition cols are not given then no need to do partition.
+    None
+  }
+
+  protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
+  java.util.Map[String, java.util.List[ColumnProperty]] = {
+    val colPropMap = new java.util.HashMap[String, java.util.List[ColumnProperty]]()
+    fields.foreach { field =>
+      if (field.children.isDefined && field.children.get != null) {
+        fillAllChildrenColumnProperty(field.column, field.children, tableProperties, colPropMap)
+      } else {
+        fillColumnProperty(None, field.column, tableProperties, colPropMap)
+      }
+    }
+    colPropMap
+  }
+
+  protected def fillAllChildrenColumnProperty(parent: String, fieldChildren: Option[List[Field]],
+      tableProperties: Map[String, String],
+      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+    fieldChildren.foreach(fields => {
+      fields.foreach(field => {
+        fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
+      }
+      )
+    }
+    )
+  }
+
+  protected def fillColumnProperty(parentColumnName: Option[String],
+      columnName: String,
+      tableProperties: Map[String, String],
+      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+    val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
+    val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
+    if (colProps.isDefined) {
+      colPropMap.put(colProKey, colProps.get)
+    }
+  }
+
+  def getKey(parentColumnName: Option[String],
+      columnName: String): (String, String) = {
+    if (parentColumnName.isDefined) {
+      if (columnName == "val") {
+        (parentColumnName.get, parentColumnName.get + "." + columnName)
+      } else {
+        (parentColumnName.get + "." + columnName, parentColumnName.get + "." + columnName)
+      }
+    } else {
+      (columnName, columnName)
+    }
+  }
+
+  /**
+   * This will extract the no inverted columns fields.
+   * By default all dimensions use inverted index.
+   *
+   * @param fields
+   * @param tableProperties
+   * @return
+   */
+  protected def extractNoInvertedIndexColumns(fields: Seq[Field],
+      tableProperties: Map[String, String]): Seq[String] = {
+    // check whether the column name is in fields
+    var noInvertedIdxColsProps: Array[String] = Array[String]()
+    var noInvertedIdxCols: Seq[String] = Seq[String]()
+
+    if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
+      noInvertedIdxColsProps =
+        tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
+      noInvertedIdxColsProps.map { noInvertedIdxColProp =>
+          if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
+            val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
+                           " does not exist in table. Please check create table statement."
+            throw new MalformedCarbonCommandException(errormsg)
+          }
+        }
+    }
+    // check duplicate columns and only 1 col left
+    val distinctCols = noInvertedIdxColsProps.toSet
+    // extract the no inverted index columns
+    fields.foreach(field => {
+      if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
+        noInvertedIdxCols :+= field.column
+      }
+    }
+    )
+    noInvertedIdxCols
+  }
+
+  /**
+   * This will extract the Dimensions and NoDictionary Dimensions fields.
+   * By default all string cols are dimensions.
+   *
+   * @param fields
+   * @param tableProperties
+   * @return
+   */
+  protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
+      tableProperties: Map[String, String]):
+  (Seq[Field], Seq[String]) = {
+    var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
+    var dictExcludeCols: Array[String] = Array[String]()
+    var noDictionaryDims: Seq[String] = Seq[String]()
+    var dictIncludeCols: Seq[String] = Seq[String]()
+
+    // All excluded cols should be there in create table cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+      dictExcludeCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+      dictExcludeCols
+        .map { dictExcludeCol =>
+          if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
+            val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
+                           " does not exist in table. Please check create table statement."
+            throw new MalformedCarbonCommandException(errormsg)
+          } else {
+            val dataType = fields.find(x =>
+              x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
+            if (isComplexDimDictionaryExclude(dataType)) {
+              val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
+                             dictExcludeCol
+              throw new MalformedCarbonCommandException(errormsg)
+            } else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
+              val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
+                             " data type column: " + dictExcludeCol
+              throw new MalformedCarbonCommandException(errorMsg)
+            }
+          }
+        }
+    }
+    // All included cols should be there in create table cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+      dictIncludeCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
+      dictIncludeCols.map { distIncludeCol =>
+        if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
+          val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
+                         " does not exist in table. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        }
+      }
+    }
+
+    // include cols should contain exclude cols
+    dictExcludeCols.foreach { dicExcludeCol =>
+      if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
+        val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
+                       " with DICTIONARY_INCLUDE. Please check create table statement."
+        throw new MalformedCarbonCommandException(errormsg)
+      }
+    }
+
+    // by default consider all String cols as dims and if any dictionary exclude is present then
+    // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
+    fields.foreach(field => {
+
+      if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
+        val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
+        if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE ) {
+          noDictionaryDims :+= field.column
+        }
+        dimFields += field
+      } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
+        dimFields += (field)
+      } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
+        dimFields += (field)
+      }
+    }
+    )
+
+    (dimFields.toSeq, noDictionaryDims)
+  }
+
+  /**
+   * It fills non string dimensions in dimFields
+   */
+  def fillNonStringDimension(dictIncludeCols: Seq[String],
+      field: Field, dimFields: LinkedHashSet[Field]) {
+    var dictInclude = false
+    if (dictIncludeCols.nonEmpty) {
+      dictIncludeCols.foreach(dictIncludeCol =>
+        if (field.column.equalsIgnoreCase(dictIncludeCol)) {
+          dictInclude = true
+        })
+    }
+    if (dictInclude) {
+      dimFields += field
+    }
+  }
+
+  /**
+   * detect dimention data type
+   *
+   * @param dimensionDatatype
+   */
+  def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
+    val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char")
+    dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
+  }
+
+  /**
+   * detects whether complex dimension is part of dictionary_exclude
+   */
+  def isComplexDimDictionaryExclude(dimensionDataType: String): Boolean = {
+    val dimensionType = Array("array", "struct")
+    dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
+  }
+
+  /**
+   * detects whether double or decimal column is part of dictionary_exclude
+   */
+  def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
+    val dataTypes = Array("string", "timestamp")
+    dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
+  }
+
+  /**
+   * Extract the Measure Cols fields. By default all non string cols will be measures.
+   *
+   * @param fields
+   * @param tableProperties
+   * @return
+   */
+  protected def extractMsrColsFromFields(fields: Seq[Field],
+      tableProperties: Map[String, String]): Seq[Field] = {
+    var msrFields: Seq[Field] = Seq[Field]()
+    var dictIncludedCols: Array[String] = Array[String]()
+    var dictExcludedCols: Array[String] = Array[String]()
+
+    // get all included cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+      dictIncludedCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
+    }
+
+    // get all excluded cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+      dictExcludedCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+    }
+
+    // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
+    fields.foreach(field => {
+      if (!isDetectAsDimentionDatatype(field.dataType.get)) {
+        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
+            !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
+          msrFields :+= field
+        }
+      }
+    })
+
+    msrFields
+  }
+
+  /**
+   * Extract the DbName and table name.
+   *
+   * @param tableNameParts
+   * @return
+   */
+  protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
+    val (db, tableName) =
+      tableNameParts.getChildren.asScala.map {
+        case Token(part, Nil) => cleanIdentifier(part)
+      } match {
+        case Seq(tableOnly) => (None, tableOnly)
+        case Seq(databaseName, table) => (Some(databaseName), table)
+      }
+
+    (db, tableName)
+  }
+
+  protected def cleanIdentifier(ident: String): String = {
+    ident match {
+      case escapedIdentifier(i) => i
+      case plainIdent => plainIdent
+    }
+  }
+
+  protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
+    var remainingNodes = nodeList
+    val clauses = clauseNames.map { clauseName =>
+      val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
+      remainingNodes = nonMatches ++ (if (matches.nonEmpty) {
+        matches.tail
+      } else {
+        Nil
+      })
+      matches.headOption
+    }
+
+    if (remainingNodes.nonEmpty) {
+      sys.error(
+        s"""Unhandled clauses:
+            |You are likely trying to use an unsupported carbon feature."""".stripMargin)
+    }
+    clauses
+  }
+
+  object Token {
+    /** @return matches of the form (tokenName, children). */
+    def unapply(t: Any): Option[(String, Seq[ASTNode])] = {
+      t match {
+        case t: ASTNode =>
+          CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
+          Some((t.getText,
+            Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
+        case _ => None
+      }
+    }
+  }
+
+  /**
+   * Extract the table properties token
+   *
+   * @param node
+   * @return
+   */
+  protected def getProperties(node: Node): Seq[(String, String)] = {
+    node match {
+      case Token("TOK_TABLEPROPLIST", list) =>
+        list.map {
+          case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
+            (unquoteString(key) -> unquoteString(value))
+        }
+    }
+  }
+
+  protected def unquoteString(str: String) = {
+    str match {
+      case singleQuotedString(s) => s.toLowerCase()
+      case doubleQuotedString(s) => s.toLowerCase()
+      case other => other
+    }
+  }
+
+  protected def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
+
+    // validate with all supported options
+    val options = optionList.get.groupBy(x => x._1)
+    val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
+      "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
+      "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
+      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT"
+    )
+    var isSupported = true
+    val invalidOptions = StringBuilder.newBuilder
+    options.foreach(value => {
+      if (!supportedOptions.exists(x => x.equalsIgnoreCase(value._1))) {
+        isSupported = false
+        invalidOptions.append(value._1)
+      }
+
+    }
+    )
+    if (!isSupported) {
+      val errorMessage = "Error: Invalid option(s): " + invalidOptions.toString()
+      throw new MalformedCarbonCommandException(errorMessage)
+    }
+
+    //  COLUMNDICT and ALL_DICTIONARY_PATH can not be used together.
+    if (options.exists(_._1.equalsIgnoreCase("COLUMNDICT")) &&
+        options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
+      val errorMessage = "Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together" +
+                         " in options"
+      throw new MalformedCarbonCommandException(errorMessage)
+    }
+
+    if (options.exists(_._1.equalsIgnoreCase("MAXCOLUMNS"))) {
+      val maxColumns: String = options.get("maxcolumns").get(0)._2
+      try {
+        maxColumns.toInt
+      } catch {
+        case ex: NumberFormatException =>
+          throw new MalformedCarbonCommandException(
+            "option MAXCOLUMNS can only contain integer values")
+      }
+    }
+
+    // check for duplicate options
+    val duplicateOptions = options filter {
+      case (_, optionlist) => optionlist.size > 1
+    }
+    val duplicates = StringBuilder.newBuilder
+    if (duplicateOptions.nonEmpty) {
+      duplicateOptions.foreach(x => {
+        duplicates.append(x._1)
+      }
+      )
+      val errorMessage = "Error: Duplicate option(s): " + duplicates.toString()
+      throw new MalformedCarbonCommandException(errorMessage)
+    }
+  }
+
+  protected lazy val dbTableIdentifier: Parser[Seq[String]] =
+    (ident <~ ".").? ~ (ident) ^^ {
+      case databaseName ~ tableName =>
+        if (databaseName.isDefined) {
+          Seq(databaseName.get, tableName)
+        } else {
+          Seq(tableName)
+        }
+    }
+
+  protected lazy val loadOptions: Parser[(String, String)] =
+    (stringLit <~ "=") ~ stringLit ^^ {
+      case opt ~ optvalue => (opt.trim.toLowerCase(), optvalue)
+      case _ => ("", "")
+    }
+
+
+  protected lazy val dimCol: Parser[Field] = anyFieldDef
+
+  protected lazy val primitiveTypes =
+    STRING ^^^ "string" | INTEGER ^^^ "integer" |
+    TIMESTAMP ^^^ "timestamp" | NUMERIC ^^^ "numeric" |
+    BIGINT ^^^ "bigint" | SHORT ^^^ "smallint" |
+    INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType | DATE ^^^ "date" | charType
+
+  /**
+   * Matching the decimal(10,0) data type and returning the same.
+   */
+  private lazy val charType =
+    CHAR ~ ("(" ~>numericLit <~ ")").? ^^ {
+      case char ~ digit =>
+        s"$char($digit)"
+    }
+
+  /**
+   * Matching the decimal(10,0) data type and returning the same.
+   */
+  private lazy val decimalType =
+  DECIMAL ~ ("(" ~> numericLit <~ ",") ~ (numericLit <~ ")") ^^ {
+    case decimal ~ precision ~ scale =>
+      s"$decimal($precision, $scale)"
+  }
+
+  protected lazy val nestedType: Parser[Field] = structFieldType | arrayFieldType |
+                                                 primitiveFieldType
+
+  lazy val anyFieldDef: Parser[Field] =
+    (ident | stringLit) ~ ((":").? ~> nestedType) ~ (IN ~> (ident | stringLit)).? ^^ {
+      case e1 ~ e2 ~ e3 =>
+        Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
+    }
+
+  protected lazy val primitiveFieldType: Parser[Field] =
+    (primitiveTypes) ^^ {
+      case e1 =>
+        Field("unknown", Some(e1), Some("unknown"), Some(null))
+    }
+
+  protected lazy val arrayFieldType: Parser[Field] =
+    ((ARRAY ^^^ "array") ~> "<" ~> nestedType <~ ">") ^^ {
+      case e1 =>
+        Field("unknown", Some("array"), Some("unknown"),
+          Some(List(Field("val", e1.dataType, Some("val"),
+            e1.children))))
+    }
+
+  protected lazy val structFieldType: Parser[Field] =
+    ((STRUCT ^^^ "struct") ~> "<" ~> repsep(anyFieldDef, ",") <~ ">") ^^ {
+      case e1 =>
+        Field("unknown", Some("struct"), Some("unknown"), Some(e1))
+    }
+
+  protected lazy val measureCol: Parser[Field] =
+    (ident | stringLit) ~ (INTEGER ^^^ "integer" | NUMERIC ^^^ "numeric" | SHORT ^^^ "smallint" |
+                           BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
+    (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
+      case e1 ~ e2 ~ e3 ~ e4 => Field(e1, e2, e3, Some(null))
+    }
+
+  private def normalizeType(field: Field): Field = {
+    val dataType = field.dataType.getOrElse("NIL")
+    dataType match {
+      case "string" =>
+        Field(field.column, Some("String"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      case "smallint" =>
+        Field(field.column, Some("SmallInt"), field.name, Some(null),
+          field.parent, field.storeType, field.schemaOrdinal,
+          field.precision, field.scale, field.rawSchema)
+      case "integer" | "int" =>
+        Field(field.column, Some("Integer"), field.name, Some(null),
+          field.parent, field.storeType, field.schemaOrdinal,
+          field.precision, field.scale, field.rawSchema)
+      case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      case "timestamp" =>
+        Field(field.column, Some("Timestamp"), field.name, Some(null),
+          field.parent, field.storeType, field.schemaOrdinal,
+          field.precision, field.scale, field.rawSchema)
+      case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      case "array" =>
+        Field(field.column, Some("Array"), field.name,
+          field.children.map(f => f.map(normalizeType(_))),
+          field.parent, field.storeType, field.schemaOrdinal,
+          field.precision, field.scale, field.rawSchema)
+      case "struct" =>
+        Field(field.column, Some("Struct"), field.name,
+          field.children.map(f => f.map(normalizeType(_))),
+          field.parent, field.storeType, field.schemaOrdinal,
+          field.precision, field.scale, field.rawSchema)
+      case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      // checking if the nested data type contains the child type as decimal(10,0),
+      // if it is present then extracting the precision and scale. resetting the data type
+      // with Decimal.
+      case _ if (dataType.startsWith("decimal")) =>
+        val (precision, scale) = getScaleAndPrecision(dataType)
+        Field(field.column,
+          Some("Decimal"),
+          field.name,
+          Some(null),
+          field.parent,
+          field.storeType, field.schemaOrdinal, precision,
+          scale,
+          field.rawSchema
+        )
+      case _ =>
+        field
+    }
+  }
+
+  private def addParent(field: Field): Field = {
+    field.dataType.getOrElse("NIL") match {
+      case "Array" => Field(field.column, Some("Array"), field.name,
+        field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
+        field.storeType, field.schemaOrdinal)
+      case "Struct" => Field(field.column, Some("Struct"), field.name,
+        field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
+        field.storeType, field.schemaOrdinal)
+      case _ => field
+    }
+  }
+
+  private def appendParentForEachChild(field: Field, parentName: String): Field = {
+    field.dataType.getOrElse("NIL") match {
+      case "String" => Field(parentName + "." + field.column, Some("String"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "SmallInt" => Field(parentName + "." + field.column, Some("SmallInt"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Long" => Field(parentName + "." + field.column, Some("Long"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Double" => Field(parentName + "." + field.column, Some("Double"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Timestamp" => Field(parentName + "." + field.column, Some("Timestamp"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Numeric" => Field(parentName + "." + field.column, Some("Numeric"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Array" => Field(parentName + "." + field.column, Some("Array"),
+        Some(parentName + "." + field.name.getOrElse(None)),
+        field.children
+          .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
+        parentName)
+      case "Struct" => Field(parentName + "." + field.column, Some("Struct"),
+        Some(parentName + "." + field.name.getOrElse(None)),
+        field.children
+          .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
+        parentName)
+      case "BigInt" => Field(parentName + "." + field.column, Some("BigInt"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale)
+      case _ => field
+    }
+  }
+
+  protected lazy val segmentId: Parser[String] =
+    numericLit ^^ { u => u } |
+    elem("decimal", p => {
+      p.getClass.getSimpleName.equals("FloatLit") ||
+      p.getClass.getSimpleName.equals("DecimalLit")
+    }) ^^ (_.chars)
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 531b691..f646f1d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -56,7 +56,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri
     children: Option[List[Field]], parent: String = null,
     storeType: Option[String] = Some("columnar"),
     var schemaOrdinal: Int = -1,
-    var precision: Int = 0, var scale: Int = 0)
+    var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "")
 
 case class ColumnProperty(key: String, value: String)
 
@@ -108,12 +108,12 @@ case class CompactionCallableModel(storePath: String,
     compactionType: CompactionType)
 
 object TableNewProcessor {
-  def apply(cm: TableModel, sqlContext: SQLContext): TableInfo = {
-    new TableNewProcessor(cm, sqlContext).process
+  def apply(cm: TableModel): TableInfo = {
+    new TableNewProcessor(cm).process
   }
 }
 
-class TableNewProcessor(cm: TableModel, sqlContext: SQLContext) {
+class TableNewProcessor(cm: TableModel) {
 
   var index = 0
   var rowGroup = 0


[2/4] incubator-carbondata git commit: Initial commit

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 21864d1..16e35f4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -17,13 +17,9 @@
 
 package org.apache.spark.sql
 
-import java.util.regex.{Matcher, Pattern}
-
 import scala.collection.JavaConverters._
-import scala.collection.mutable.LinkedHashSet
 import scala.collection.mutable.Map
 import scala.language.implicitConversions
-import scala.util.matching.Regex
 
 import org.apache.hadoop.hive.ql.lib.Node
 import org.apache.hadoop.hive.ql.parse._
@@ -31,154 +27,18 @@ import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.ExplainCommand
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.HiveQlWrapper
 
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Parser for All Carbon DDL, DML cases in Unified context
  */
-class CarbonSqlParser() extends AbstractSparkSQLParser {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  protected val AGGREGATE = carbonKeyWord("AGGREGATE")
-  protected val AS = carbonKeyWord("AS")
-  protected val AGGREGATION = carbonKeyWord("AGGREGATION")
-  protected val ALL = carbonKeyWord("ALL")
-  protected val HIGH_CARDINALITY_DIMS = carbonKeyWord("NO_DICTIONARY")
-  protected val BEFORE = carbonKeyWord("BEFORE")
-  protected val BY = carbonKeyWord("BY")
-  protected val CARDINALITY = carbonKeyWord("CARDINALITY")
-  protected val CASCADE = carbonKeyWord("CASCADE")
-  protected val CLASS = carbonKeyWord("CLASS")
-  protected val CLEAN = carbonKeyWord("CLEAN")
-  protected val COLS = carbonKeyWord("COLS")
-  protected val COLUMNS = carbonKeyWord("COLUMNS")
-  protected val CREATE = carbonKeyWord("CREATE")
-  protected val CUBE = carbonKeyWord("CUBE")
-  protected val CUBES = carbonKeyWord("CUBES")
-  protected val DATA = carbonKeyWord("DATA")
-  protected val DATABASE = carbonKeyWord("DATABASE")
-  protected val DATABASES = carbonKeyWord("DATABASES")
-  protected val DELETE = carbonKeyWord("DELETE")
-  protected val DELIMITER = carbonKeyWord("DELIMITER")
-  protected val DESCRIBE = carbonKeyWord("DESCRIBE")
-  protected val DESC = carbonKeyWord("DESC")
-  protected val DETAIL = carbonKeyWord("DETAIL")
-  protected val DIMENSIONS = carbonKeyWord("DIMENSIONS")
-  protected val DIMFOLDERPATH = carbonKeyWord("DIMFOLDERPATH")
-  protected val DROP = carbonKeyWord("DROP")
-  protected val ESCAPECHAR = carbonKeyWord("ESCAPECHAR")
-  protected val EXCLUDE = carbonKeyWord("EXCLUDE")
-  protected val EXPLAIN = carbonKeyWord("EXPLAIN")
-  protected val EXTENDED = carbonKeyWord("EXTENDED")
-  protected val FORMATTED = carbonKeyWord("FORMATTED")
-  protected val FACT = carbonKeyWord("FACT")
-  protected val FIELDS = carbonKeyWord("FIELDS")
-  protected val FILEHEADER = carbonKeyWord("FILEHEADER")
-  protected val SERIALIZATION_NULL_FORMAT = carbonKeyWord("SERIALIZATION_NULL_FORMAT")
-  protected val BAD_RECORDS_LOGGER_ENABLE = carbonKeyWord("BAD_RECORDS_LOGGER_ENABLE")
-  protected val BAD_RECORDS_ACTION = carbonKeyWord("BAD_RECORDS_ACTION")
-  protected val FILES = carbonKeyWord("FILES")
-  protected val FROM = carbonKeyWord("FROM")
-  protected val HIERARCHIES = carbonKeyWord("HIERARCHIES")
-  protected val IN = carbonKeyWord("IN")
-  protected val INCLUDE = carbonKeyWord("INCLUDE")
-  protected val INPATH = carbonKeyWord("INPATH")
-  protected val INTO = carbonKeyWord("INTO")
-  protected val LEVELS = carbonKeyWord("LEVELS")
-  protected val LIKE = carbonKeyWord("LIKE")
-  protected val LOAD = carbonKeyWord("LOAD")
-  protected val LOCAL = carbonKeyWord("LOCAL")
-  protected val MAPPED = carbonKeyWord("MAPPED")
-  protected val MEASURES = carbonKeyWord("MEASURES")
-  protected val MULTILINE = carbonKeyWord("MULTILINE")
-  protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1")
-  protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2")
-  protected val OPTIONS = carbonKeyWord("OPTIONS")
-  protected val OUTPATH = carbonKeyWord("OUTPATH")
-  protected val OVERWRITE = carbonKeyWord("OVERWRITE")
-  protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT")
-  protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA")
-  protected val PARTITIONER = carbonKeyWord("PARTITIONER")
-  protected val QUOTECHAR = carbonKeyWord("QUOTECHAR")
-  protected val RELATION = carbonKeyWord("RELATION")
-  protected val SCHEMA = carbonKeyWord("SCHEMA")
-  protected val SCHEMAS = carbonKeyWord("SCHEMAS")
-  protected val SHOW = carbonKeyWord("SHOW")
-  protected val TABLES = carbonKeyWord("TABLES")
-  protected val TABLE = carbonKeyWord("TABLE")
-  protected val TERMINATED = carbonKeyWord("TERMINATED")
-  protected val TYPE = carbonKeyWord("TYPE")
-  protected val USE = carbonKeyWord("USE")
-  protected val WHERE = carbonKeyWord("WHERE")
-  protected val WITH = carbonKeyWord("WITH")
-  protected val AGGREGATETABLE = carbonKeyWord("AGGREGATETABLE")
-  protected val ABS = carbonKeyWord("abs")
-
-  protected val FOR = carbonKeyWord("FOR")
-  protected val SCRIPTS = carbonKeyWord("SCRIPTS")
-  protected val USING = carbonKeyWord("USING")
-  protected val LIMIT = carbonKeyWord("LIMIT")
-  protected val DEFAULTS = carbonKeyWord("DEFAULTS")
-  protected val ALTER = carbonKeyWord("ALTER")
-  protected val ADD = carbonKeyWord("ADD")
-
-  protected val IF = carbonKeyWord("IF")
-  protected val NOT = carbonKeyWord("NOT")
-  protected val EXISTS = carbonKeyWord("EXISTS")
-  protected val DIMENSION = carbonKeyWord("DIMENSION")
-  protected val STARTTIME = carbonKeyWord("STARTTIME")
-  protected val SEGMENTS = carbonKeyWord("SEGMENTS")
-  protected val SEGMENT = carbonKeyWord("SEGMENT")
-
-  protected val STRING = carbonKeyWord("STRING")
-  protected val INTEGER = carbonKeyWord("INTEGER")
-  protected val TIMESTAMP = carbonKeyWord("TIMESTAMP")
-  protected val DATE = carbonKeyWord("DATE")
-  protected val CHAR = carbonKeyWord("CHAR")
-  protected val NUMERIC = carbonKeyWord("NUMERIC")
-  protected val DECIMAL = carbonKeyWord("DECIMAL")
-  protected val DOUBLE = carbonKeyWord("DOUBLE")
-  protected val SHORT = carbonKeyWord("SMALLINT")
-  protected val INT = carbonKeyWord("INT")
-  protected val BIGINT = carbonKeyWord("BIGINT")
-  protected val ARRAY = carbonKeyWord("ARRAY")
-  protected val STRUCT = carbonKeyWord("STRUCT")
-
-  protected val doubleQuotedString = "\"([^\"]+)\"".r
-  protected val singleQuotedString = "'([^']+)'".r
-
-  protected val newReservedWords =
-    this.getClass
-      .getMethods
-      .filter(_.getReturnType == classOf[Keyword])
-      .map(_.invoke(this).asInstanceOf[Keyword].str)
-
-  override val lexical = {
-    val sqllex = new SqlLexical()
-    sqllex.initialize(newReservedWords)
-    sqllex
-
-  }
-
-  import lexical.Identifier
-
-  implicit def regexToParser(regex: Regex): Parser[String] = {
-    acceptMatch(
-    s"identifier matching regex ${ regex }",
-    { case Identifier(str) if regex.unapplySeq(str).isDefined => str }
-    )
-  }
+class CarbonSqlParser() extends CarbonDDLSqlParser {
 
   override def parse(input: String): LogicalPlan = {
     synchronized {
@@ -196,26 +56,14 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
     }
   }
 
-  /**
-   * This will convert key word to regular expression.
-   *
-   * @param keys
-   * @return
-   */
-  private def carbonKeyWord(keys: String) = {
-    ("(?i)" + keys).r
-  }
-
   override protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
 
-  protected lazy val startCommand: Parser[LogicalPlan] = createDatabase | dropDatabase |
-                                                         loadManagement | describeTable |
-                                                         showLoads | alterTable | createTable
-
-  protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate |
-                                                           cleanFiles | loadDataNew
+  protected lazy val startCommand: Parser[LogicalPlan] =
+    createDatabase | dropDatabase | loadManagement | describeTable |
+    showLoads | alterTable | createTable
 
-  protected val escapedIdentifier = "`([^`]+)`".r
+  protected lazy val loadManagement: Parser[LogicalPlan] =
+    deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
 
   protected lazy val createDatabase: Parser[LogicalPlan] =
     CREATE ~> (DATABASE | SCHEMA) ~> restInput ^^ {
@@ -253,19 +101,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
         DropDatabase(dbName, isCascade, dropDbSql)
     }
 
-  private def reorderDimensions(dims: Seq[Field]): Seq[Field] = {
-    var complexDimensions: Seq[Field] = Seq()
-    var dimensions: Seq[Field] = Seq()
-    dims.foreach { dimension =>
-      dimension.dataType.getOrElse("NIL") match {
-        case "Array" => complexDimensions = complexDimensions :+ dimension
-        case "Struct" => complexDimensions = complexDimensions :+ dimension
-        case _ => dimensions = dimensions :+ dimension
-      }
-    }
-    dimensions ++ complexDimensions
-  }
-
   protected lazy val alterTable: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> restInput ^^ {
       case statement =>
@@ -303,14 +138,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
       }
   }
 
-  private def getScaleAndPrecision(dataType: String): (Int, Int) = {
-    val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
-    m.find()
-    val matchedString: String = m.group(1)
-    val scaleAndPrecision = matchedString.split(",")
-    (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
-  }
-
   /**
    * This function will traverse the tree and logical plan will be formed using that.
    *
@@ -386,6 +213,7 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
                   if(f.dataType.getOrElse("").startsWith("char")) {
                     f.dataType = Some("char")
                   }
+                  f.rawSchema = x
                   fields ++= Seq(f)
                 }
               }
@@ -494,511 +322,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
     }
   }
 
-  /**
-   * This will prepate the Model from the Tree details.
-   *
-   * @param ifNotExistPresent
-   * @param dbName
-   * @param tableName
-   * @param fields
-   * @param partitionCols
-   * @param tableProperties
-   * @return
-   */
-  protected def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
-      , tableName: String, fields: Seq[Field],
-      partitionCols: Seq[PartitionerField],
-      tableProperties: Map[String, String]): TableModel
-  = {
-
-    fields.zipWithIndex.foreach { x =>
-      x._1.schemaOrdinal = x._2
-    }
-    val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
-      fields, tableProperties)
-    if (dims.isEmpty) {
-      throw new MalformedCarbonCommandException(s"Table ${
-        dbName.getOrElse(
-          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-      }.$tableName"
-                                                +
-                                                " can not be created without key columns. Please " +
-                                                "use DICTIONARY_INCLUDE or " +
-                                                "DICTIONARY_EXCLUDE to set at least one key " +
-                                                "column " +
-                                                "if all specified columns are numeric types")
-    }
-    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
-
-    // column properties
-    val colProps = extractColumnProperties(fields, tableProperties)
-    // get column groups configuration from table properties.
-    val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
-      noDictionaryDims, msrs, dims)
-
-    // get no inverted index columns from table properties.
-    val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
-
-    // validate the tableBlockSize from table properties
-    CommonUtil.validateTableBlockSize(tableProperties)
-
-    TableModel(
-      ifNotExistPresent,
-      dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
-      dbName,
-      tableName,
-      tableProperties,
-      reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
-      msrs.map(f => normalizeType(f)),
-      Option(noDictionaryDims),
-      Option(noInvertedIdxCols),
-      groupCols,
-      Some(colProps))
-  }
-
-  /**
-   * Extract the column groups configuration from table properties.
-   * Based on this Row groups of fields will be determined.
-   *
-   * @param tableProperties
-   * @return
-   */
-  protected def updateColumnGroupsInField(tableProperties: Map[String, String],
-      noDictionaryDims: Seq[String],
-      msrs: Seq[Field],
-      dims: Seq[Field]): Seq[String] = {
-    if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
-
-      var splittedColGrps: Seq[String] = Seq[String]()
-      val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
-
-      // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
-      // here first splitting the value by () . so that the above will be splitted into 2 strings.
-      // [col1,col2] [col3,col4]
-      val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols)
-      while (m.find()) {
-        val oneGroup: String = m.group(1)
-        CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, splittedColGrps, dims)
-        val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims)
-        splittedColGrps :+= arrangedColGrp
-      }
-      // This will  be furthur handled.
-      CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
-    } else {
-      null
-    }
-  }
-
-  def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = {
-    // if columns in column group is not in schema order than arrange it in schema order
-    var colGrpFieldIndx: Seq[Int] = Seq[Int]()
-    colGroup.split(',').map(_.trim).foreach { x =>
-      dims.zipWithIndex.foreach { dim =>
-        if (dim._1.column.equalsIgnoreCase(x)) {
-          colGrpFieldIndx :+= dim._2
-        }
-      }
-    }
-    // sort it
-    colGrpFieldIndx = colGrpFieldIndx.sorted
-    // check if columns in column group is in schema order
-    if (!checkIfInSequence(colGrpFieldIndx)) {
-      throw new MalformedCarbonCommandException("Invalid column group:" + colGroup)
-    }
-    def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = {
-      for (i <- 0 until (colGrpFieldIndx.length - 1)) {
-        if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) {
-          throw new MalformedCarbonCommandException(
-            "Invalid column group,column in group should be contiguous as per schema.")
-        }
-      }
-      true
-    }
-    val colGrpNames: StringBuilder = StringBuilder.newBuilder
-    for (i <- colGrpFieldIndx.indices) {
-      colGrpNames.append(dims(colGrpFieldIndx(i)).column)
-      if (i < (colGrpFieldIndx.length - 1)) {
-        colGrpNames.append(",")
-      }
-    }
-    colGrpNames.toString()
-  }
-
-  /**
-   * For getting the partitioner Object
-   *
-   * @param partitionCols
-   * @param tableProperties
-   * @return
-   */
-  protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
-      tableProperties: Map[String, String]):
-  Option[Partitioner] = {
-
-    // by default setting partition class empty.
-    // later in table schema it is setting to default value.
-    var partitionClass: String = ""
-    var partitionCount: Int = 1
-    var partitionColNames: Array[String] = Array[String]()
-    if (tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).isDefined) {
-      partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
-    }
-
-    if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
-      try {
-        partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
-      } catch {
-        case e: Exception => // no need to do anything.
-      }
-    }
-
-    partitionCols.foreach(col =>
-      partitionColNames :+= col.partitionColumn
-    )
-
-    // this means user has given partition cols list
-    if (!partitionColNames.isEmpty) {
-      return Option(Partitioner(partitionClass, partitionColNames, partitionCount, null))
-    }
-    // if partition cols are not given then no need to do partition.
-    None
-  }
-
-  protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
-  java.util.Map[String, java.util.List[ColumnProperty]] = {
-    val colPropMap = new java.util.HashMap[String, java.util.List[ColumnProperty]]()
-    fields.foreach { field =>
-      if (field.children.isDefined && field.children.get != null) {
-        fillAllChildrenColumnProperty(field.column, field.children, tableProperties, colPropMap)
-      } else {
-        fillColumnProperty(None, field.column, tableProperties, colPropMap)
-      }
-    }
-    colPropMap
-  }
-
-  protected def fillAllChildrenColumnProperty(parent: String, fieldChildren: Option[List[Field]],
-      tableProperties: Map[String, String],
-      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
-    fieldChildren.foreach(fields => {
-      fields.foreach(field => {
-        fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
-      }
-      )
-    }
-    )
-  }
-
-  protected def fillColumnProperty(parentColumnName: Option[String],
-      columnName: String,
-      tableProperties: Map[String, String],
-      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
-    val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
-    val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
-    if (colProps.isDefined) {
-      colPropMap.put(colProKey, colProps.get)
-    }
-  }
-
-  def getKey(parentColumnName: Option[String],
-      columnName: String): (String, String) = {
-    if (parentColumnName.isDefined) {
-      if (columnName == "val") {
-        (parentColumnName.get, parentColumnName.get + "." + columnName)
-      } else {
-        (parentColumnName.get + "." + columnName, parentColumnName.get + "." + columnName)
-      }
-    } else {
-      (columnName, columnName)
-    }
-  }
-
-  /**
-   * This will extract the no inverted columns fields.
-   * By default all dimensions use inverted index.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractNoInvertedIndexColumns(fields: Seq[Field],
-      tableProperties: Map[String, String]):
-  Seq[String] = {
-    // check whether the column name is in fields
-    var noInvertedIdxColsProps: Array[String] = Array[String]()
-    var noInvertedIdxCols: Seq[String] = Seq[String]()
-
-    if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
-      noInvertedIdxColsProps =
-        tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
-      noInvertedIdxColsProps
-        .map { noInvertedIdxColProp =>
-          if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
-            val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
-                           " does not exist in table. Please check create table statement."
-            throw new MalformedCarbonCommandException(errormsg)
-          }
-        }
-    }
-    // check duplicate columns and only 1 col left
-    val distinctCols = noInvertedIdxColsProps.toSet
-    // extract the no inverted index columns
-    fields.foreach(field => {
-      if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
-        noInvertedIdxCols :+= field.column
-      }
-    }
-    )
-    noInvertedIdxCols
-  }
-
-  /**
-   * This will extract the Dimensions and NoDictionary Dimensions fields.
-   * By default all string cols are dimensions.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
-      tableProperties: Map[String, String]):
-  (Seq[Field], Seq[String]) = {
-    var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
-    var dictExcludeCols: Array[String] = Array[String]()
-    var noDictionaryDims: Seq[String] = Seq[String]()
-    var dictIncludeCols: Seq[String] = Seq[String]()
-
-    // All excluded cols should be there in create table cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
-      dictExcludeCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
-      dictExcludeCols
-        .map { dictExcludeCol =>
-          if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
-            val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
-                           " does not exist in table. Please check create table statement."
-            throw new MalformedCarbonCommandException(errormsg)
-          } else {
-            val dataType = fields.find(x =>
-              x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
-            if (isComplexDimDictionaryExclude(dataType)) {
-              val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
-                             dictExcludeCol
-              throw new MalformedCarbonCommandException(errormsg)
-            } else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
-              val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
-                             " data type column: " + dictExcludeCol
-              throw new MalformedCarbonCommandException(errorMsg)
-            }
-          }
-        }
-    }
-    // All included cols should be there in create table cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
-      dictIncludeCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
-      dictIncludeCols.map { distIncludeCol =>
-        if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
-          val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
-                         " does not exist in table. Please check create table statement."
-          throw new MalformedCarbonCommandException(errormsg)
-        }
-      }
-    }
-
-    // include cols should contain exclude cols
-    dictExcludeCols.foreach { dicExcludeCol =>
-      if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
-        val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
-                       " with DICTIONARY_INCLUDE. Please check create table statement."
-        throw new MalformedCarbonCommandException(errormsg)
-      }
-    }
-
-    // by default consider all String cols as dims and if any dictionary exclude is present then
-    // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
-    fields.foreach(field => {
-
-      if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
-        val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
-        if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE ) {
-          noDictionaryDims :+= field.column
-        }
-        dimFields += field
-      } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
-        dimFields += (field)
-      } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
-        dimFields += (field)
-      }
-    }
-    )
-
-    (dimFields.toSeq, noDictionaryDims)
-  }
-
-  /**
-   * It fills non string dimensions in dimFields
-   */
-  def fillNonStringDimension(dictIncludeCols: Seq[String],
-      field: Field, dimFields: LinkedHashSet[Field]) {
-    var dictInclude = false
-    if (dictIncludeCols.nonEmpty) {
-      dictIncludeCols.foreach(dictIncludeCol =>
-        if (field.column.equalsIgnoreCase(dictIncludeCol)) {
-          dictInclude = true
-        })
-    }
-    if (dictInclude) {
-      dimFields += field
-    }
-  }
-
-  /**
-   * detect dimention data type
-   *
-   * @param dimensionDatatype
-   */
-  def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
-    val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char")
-    dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
-  }
-
-  /**
-   * detects whether complex dimension is part of dictionary_exclude
-   */
-  def isComplexDimDictionaryExclude(dimensionDataType: String): Boolean = {
-    val dimensionType = Array("array", "struct")
-    dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
-  }
-
-  /**
-   * detects whether double or decimal column is part of dictionary_exclude
-   */
-  def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
-    val dataTypes = Array("string", "timestamp")
-    dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
-  }
-
-  /**
-   * Extract the Measure Cols fields. By default all non string cols will be measures.
-   *
-   * @param fields
-   * @param tableProperties
-   * @return
-   */
-  protected def extractMsrColsFromFields(fields: Seq[Field],
-      tableProperties: Map[String, String]): Seq[Field] = {
-    var msrFields: Seq[Field] = Seq[Field]()
-    var dictIncludedCols: Array[String] = Array[String]()
-    var dictExcludedCols: Array[String] = Array[String]()
-
-    // get all included cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
-      dictIncludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
-    }
-
-    // get all excluded cols
-    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
-      dictExcludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
-    }
-
-    // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
-    fields.foreach(field => {
-      if (!isDetectAsDimentionDatatype(field.dataType.get)) {
-        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
-            !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
-          msrFields :+= field
-        }
-      }
-    })
-
-    msrFields
-  }
-
-  /**
-   * Extract the DbName and table name.
-   *
-   * @param tableNameParts
-   * @return
-   */
-  protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
-    val (db, tableName) =
-      tableNameParts.getChildren.asScala.map {
-        case Token(part, Nil) => cleanIdentifier(part)
-      } match {
-        case Seq(tableOnly) => (None, tableOnly)
-        case Seq(databaseName, table) => (Some(databaseName), table)
-      }
-
-    (db, tableName)
-  }
-
-  protected def cleanIdentifier(ident: String): String = {
-    ident match {
-      case escapedIdentifier(i) => i
-      case plainIdent => plainIdent
-    }
-  }
-
-  protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
-    var remainingNodes = nodeList
-    val clauses = clauseNames.map { clauseName =>
-      val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
-      remainingNodes = nonMatches ++ (if (matches.nonEmpty) {
-        matches.tail
-      } else {
-        Nil
-      })
-      matches.headOption
-    }
-
-    if (remainingNodes.nonEmpty) {
-      sys.error(
-        s"""Unhandled clauses:
-            |You are likely trying to use an unsupported carbon feature."""".stripMargin)
-    }
-    clauses
-  }
-
-  object Token {
-    /** @return matches of the form (tokenName, children). */
-    def unapply(t: Any): Option[(String, Seq[ASTNode])] = {
-      t match {
-        case t: ASTNode =>
-          CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
-          Some((t.getText,
-            Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
-        case _ => None
-      }
-    }
-  }
-
-  /**
-   * Extract the table properties token
-   *
-   * @param node
-   * @return
-   */
-  protected def getProperties(node: Node): Seq[(String, String)] = {
-    node match {
-      case Token("TOK_TABLEPROPLIST", list) =>
-        list.map {
-          case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
-            (unquoteString(key) -> unquoteString(value))
-        }
-    }
-  }
-
-  protected def unquoteString(str: String) = {
-    str match {
-      case singleQuotedString(s) => s.toLowerCase()
-      case doubleQuotedString(s) => s.toLowerCase()
-      case other => other
-    }
-  }
-
   protected lazy val loadDataNew: Parser[LogicalPlan] =
     LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
     (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
@@ -1015,143 +338,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
           isOverwrite.isDefined)
     }
 
-  private def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
-
-    // validate with all supported options
-    val options = optionList.get.groupBy(x => x._1)
-    val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
-      "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
-      "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
-      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT"
-    )
-    var isSupported = true
-    val invalidOptions = StringBuilder.newBuilder
-    options.foreach(value => {
-      if (!supportedOptions.exists(x => x.equalsIgnoreCase(value._1))) {
-        isSupported = false
-        invalidOptions.append(value._1)
-      }
-
-    }
-    )
-    if (!isSupported) {
-      val errorMessage = "Error: Invalid option(s): " + invalidOptions.toString()
-      throw new MalformedCarbonCommandException(errorMessage)
-    }
-
-    //  COLUMNDICT and ALL_DICTIONARY_PATH can not be used together.
-    if (options.exists(_._1.equalsIgnoreCase("COLUMNDICT")) &&
-        options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
-      val errorMessage = "Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together" +
-                         " in options"
-      throw new MalformedCarbonCommandException(errorMessage)
-    }
-
-    if (options.exists(_._1.equalsIgnoreCase("MAXCOLUMNS"))) {
-      val maxColumns: String = options.get("maxcolumns").get(0)._2
-      try {
-        maxColumns.toInt
-      } catch {
-        case ex: NumberFormatException =>
-          throw new MalformedCarbonCommandException(
-            "option MAXCOLUMNS can only contain integer values")
-      }
-    }
-
-    // check for duplicate options
-    val duplicateOptions = options filter {
-      case (_, optionlist) => optionlist.size > 1
-    }
-    val duplicates = StringBuilder.newBuilder
-    if (duplicateOptions.nonEmpty) {
-      duplicateOptions.foreach(x => {
-        duplicates.append(x._1)
-      }
-      )
-      val errorMessage = "Error: Duplicate option(s): " + duplicates.toString()
-      throw new MalformedCarbonCommandException(errorMessage)
-    }
-  }
-
-  protected lazy val dbTableIdentifier: Parser[Seq[String]] =
-    (ident <~ ".").? ~ (ident) ^^ {
-      case databaseName ~ tableName =>
-        if (databaseName.isDefined) {
-          Seq(databaseName.get, tableName)
-        } else {
-          Seq(tableName)
-        }
-    }
-
-  protected lazy val loadOptions: Parser[(String, String)] =
-    (stringLit <~ "=") ~ stringLit ^^ {
-      case opt ~ optvalue => (opt.trim.toLowerCase(), optvalue)
-      case _ => ("", "")
-    }
-
-
-  protected lazy val dimCol: Parser[Field] = anyFieldDef
-
-  protected lazy val primitiveTypes =
-    STRING ^^^ "string" | INTEGER ^^^ "integer" |
-    TIMESTAMP ^^^ "timestamp" | NUMERIC ^^^ "numeric" |
-    BIGINT ^^^ "bigint" | SHORT ^^^ "smallint" |
-    INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType | DATE ^^^ "date" | charType
-
-  /**
-   * Matching the decimal(10,0) data type and returning the same.
-   */
-  private lazy val charType =
-    CHAR ~ ("(" ~>numericLit <~ ")").? ^^ {
-      case char ~ digit =>
-        s"$char($digit)"
-    }
-
-  /**
-   * Matching the decimal(10,0) data type and returning the same.
-   */
-  private lazy val decimalType =
-  DECIMAL ~ ("(" ~> numericLit <~ ",") ~ (numericLit <~ ")") ^^ {
-    case decimal ~ precision ~ scale =>
-      s"$decimal($precision, $scale)"
-  }
-
-  protected lazy val nestedType: Parser[Field] = structFieldType | arrayFieldType |
-                                                 primitiveFieldType
-
-  protected lazy val anyFieldDef: Parser[Field] =
-    (ident | stringLit) ~ ((":").? ~> nestedType) ~ (IN ~> (ident | stringLit)).? ^^ {
-      case e1 ~ e2 ~ e3 =>
-        Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
-    }
-
-  protected lazy val primitiveFieldType: Parser[Field] =
-    (primitiveTypes) ^^ {
-      case e1 =>
-        Field("unknown", Some(e1), Some("unknown"), Some(null))
-    }
-
-  protected lazy val arrayFieldType: Parser[Field] =
-    ((ARRAY ^^^ "array") ~> "<" ~> nestedType <~ ">") ^^ {
-      case e1 =>
-        Field("unknown", Some("array"), Some("unknown"),
-          Some(List(Field("val", e1.dataType, Some("val"),
-            e1.children))))
-    }
-
-  protected lazy val structFieldType: Parser[Field] =
-    ((STRUCT ^^^ "struct") ~> "<" ~> repsep(anyFieldDef, ",") <~ ">") ^^ {
-      case e1 =>
-        Field("unknown", Some("struct"), Some("unknown"), Some(e1))
-    }
-
-  protected lazy val measureCol: Parser[Field] =
-    (ident | stringLit) ~ (INTEGER ^^^ "integer" | NUMERIC ^^^ "numeric" | SHORT ^^^ "smallint" |
-                           BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
-    (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
-      case e1 ~ e2 ~ e3 ~ e4 => Field(e1, e2, e3, Some(null))
-    }
-
   protected lazy val describeTable: Parser[LogicalPlan] =
     ((DESCRIBE | DESC) ~> opt(EXTENDED | FORMATTED)) ~ (ident <~ ".").? ~ ident ^^ {
       case ef ~ db ~ tbl =>
@@ -1169,109 +355,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
         }
     }
 
-  private def normalizeType(field: Field): Field = {
-    val dataType = field.dataType.getOrElse("NIL")
-    dataType match {
-      case "string" => Field(field.column, Some("String"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal
-      )
-      case "smallint" => Field(field.column, Some("SmallInt"), field.name, Some(null),
-        field.parent, field.storeType, field.schemaOrdinal
-      )
-      case "integer" | "int" => Field(field.column, Some("Integer"), field.name, Some(null),
-        field.parent, field.storeType, field.schemaOrdinal
-      )
-      case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal
-      )
-      case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal
-      )
-      case "timestamp" => Field(field.column, Some("Timestamp"), field.name, Some(null),
-        field.parent, field.storeType, field.schemaOrdinal
-      )
-      case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal
-      )
-      case "array" => Field(field.column, Some("Array"), field.name,
-        field.children.map(f => f.map(normalizeType(_))),
-        field.parent, field.storeType, field.schemaOrdinal
-      )
-      case "struct" => Field(field.column, Some("Struct"), field.name,
-        field.children.map(f => f.map(normalizeType(_))),
-        field.parent, field.storeType, field.schemaOrdinal
-      )
-      case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal
-      )
-      case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale
-      )
-      // checking if the nested data type contains the child type as decimal(10,0),
-      // if it is present then extracting the precision and scale. resetting the data type
-      // with Decimal.
-      case _ if (dataType.startsWith("decimal")) =>
-        val (precision, scale) = getScaleAndPrecision(dataType)
-        Field(field.column,
-          Some("Decimal"),
-          field.name,
-          Some(null),
-          field.parent,
-          field.storeType, field.schemaOrdinal, precision,
-          scale
-        )
-      case _ =>
-        field
-    }
-  }
-
-  private def addParent(field: Field): Field = {
-    field.dataType.getOrElse("NIL") match {
-      case "Array" => Field(field.column, Some("Array"), field.name,
-        field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
-        field.storeType, field.schemaOrdinal)
-      case "Struct" => Field(field.column, Some("Struct"), field.name,
-        field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
-        field.storeType, field.schemaOrdinal)
-      case _ => field
-    }
-  }
-
-  private def appendParentForEachChild(field: Field, parentName: String): Field = {
-    field.dataType.getOrElse("NIL") match {
-      case "String" => Field(parentName + "." + field.column, Some("String"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "SmallInt" => Field(parentName + "." + field.column, Some("SmallInt"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Long" => Field(parentName + "." + field.column, Some("Long"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Double" => Field(parentName + "." + field.column, Some("Double"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Timestamp" => Field(parentName + "." + field.column, Some("Timestamp"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Numeric" => Field(parentName + "." + field.column, Some("Numeric"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Array" => Field(parentName + "." + field.column, Some("Array"),
-        Some(parentName + "." + field.name.getOrElse(None)),
-        field.children
-          .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
-        parentName)
-      case "Struct" => Field(parentName + "." + field.column, Some("Struct"),
-        Some(parentName + "." + field.name.getOrElse(None)),
-        field.children
-          .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
-        parentName)
-      case "BigInt" => Field(parentName + "." + field.column, Some("BigInt"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
-      case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
-        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale)
-      case _ => field
-    }
-  }
-
   protected lazy val showLoads: Parser[LogicalPlan] =
     SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
     (LIMIT ~> numericLit).? <~
@@ -1280,13 +363,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
         ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
     }
 
-  protected lazy val segmentId: Parser[String] =
-    numericLit ^^ { u => u } |
-    elem("decimal", p => {
-      p.getClass.getSimpleName.equals("FloatLit") ||
-      p.getClass.getSimpleName.equals("DecimalLit")
-    }) ^^ (_.chars)
-
   protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
     DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
                                                    (ident <~ ".").? ~ ident) <~

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d82290e..8ad1203 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -139,7 +139,7 @@ case class CreateTable(cm: TableModel) extends RunnableCommand {
     val dbName = cm.databaseName
     LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
 
-    val tableInfo: TableInfo = TableNewProcessor(cm, sqlContext)
+    val tableInfo: TableInfo = TableNewProcessor(cm)
 
     if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
       sys.error("No Dimensions found. Table should have at least one dimesnion !")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 88e43fd..2943a52 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
-import org.apache.spark.sql.optimizer.{CarbonDecoderRelation}
+import org.apache.spark.sql.hive.{HiveContext, HiveSessionCatalog}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.types.{StringType, TimestampType}
 
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
@@ -39,3 +42,34 @@ abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
 case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
 
 case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
+
+object getDB {
+
+  def getDatabaseName(dbName: Option[String], sparkSession: SparkSession): String = {
+    dbName.getOrElse(
+      sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCurrentDatabase)
+  }
+}
+
+/**
+ * Shows Loads in a table
+ */
+case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit: Option[String])
+  extends Command {
+
+  override def output: Seq[Attribute] = {
+    Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
+      AttributeReference("Status", StringType, nullable = false)(),
+      AttributeReference("Load Start Time", TimestampType, nullable = false)(),
+      AttributeReference("Load End Time", TimestampType, nullable = false)())
+  }
+}
+
+/**
+ * Describe formatted for hive table
+ */
+case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier) extends Command {
+
+  override def output: Seq[AttributeReference] =
+    Seq(AttributeReference("result", StringType, nullable = false)())
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 1fa710e..976a1b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -41,13 +41,13 @@ object CarbonEnv {
 
   var initialized = false
 
-  def init(sqlContext: SQLContext): Unit = {
+  def init(sparkSession: SparkSession): Unit = {
     if (!initialized) {
       val catalog = {
         val storePath =
           CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
         LOGGER.info(s"carbon env initial: $storePath")
-        new CarbonMetastore(sqlContext.sparkSession.conf, storePath)
+        new CarbonMetastore(sparkSession.conf, storePath)
       }
       carbonEnv = CarbonEnv(catalog)
       initialized = true

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
new file mode 100644
index 0000000..748d292
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.SparkSession.Builder
+import org.apache.spark.sql.hive.CarbonSessionState
+import org.apache.spark.sql.internal.SessionState
+
+/**
+ * Session implementation for {org.apache.spark.sql.SparkSession}
+ * Implemented this class only to use our own SQL DDL commands.
+ * User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session.
+ * @param sc
+ */
+class CarbonSession(sc: SparkContext) extends SparkSession(sc) {
+
+  CarbonEnv.init(this)
+
+  @transient
+  override private[sql] lazy val sessionState: SessionState = new CarbonSessionState(this)
+}
+
+object CarbonSession {
+
+  implicit class CarbonBuilder(builder: Builder) {
+
+
+    def getOrCreateCarbonSession(): SparkSession = synchronized {
+
+      val options =
+        getValue("options", builder).asInstanceOf[scala.collection.mutable.HashMap[String, String]]
+      val userSuppliedContext: Option[SparkContext] =
+        getValue("userSuppliedContext", builder).asInstanceOf[Option[SparkContext]]
+
+      // Get the session from current thread's active session.
+      var session: SparkSession = SparkSession.getActiveSession match {
+        case Some(session) =>
+          if ((session ne null) && !session.sparkContext.isStopped) {
+            options.foreach { case (k, v) => session.conf.set(k, v) }
+            session
+          } else {
+            null
+          }
+        case _ => null
+      }
+      if (session ne null) {
+        return session
+      }
+
+      // Global synchronization so we will only set the default session once.
+      SparkSession.synchronized {
+        // If the current thread does not have an active session, get it from the global session.
+        session = SparkSession.getDefaultSession match {
+          case Some(session) =>
+            if ((session ne null) && !session.sparkContext.isStopped) {
+              options.foreach { case (k, v) => session.conf.set(k, v) }
+              session
+            } else {
+              null
+            }
+          case _ => null
+        }
+        if (session ne null) {
+          return session
+        }
+
+        // No active nor global default session. Create a new one.
+        val sparkContext = userSuppliedContext.getOrElse {
+          // set app name if not given
+          val randomAppName = java.util.UUID.randomUUID().toString
+          val sparkConf = new SparkConf()
+          options.foreach { case (k, v) => sparkConf.set(k, v) }
+          if (!sparkConf.contains("spark.app.name")) {
+            sparkConf.setAppName(randomAppName)
+          }
+          val sc = SparkContext.getOrCreate(sparkConf)
+          // maybe this is an existing SparkContext, update its SparkConf which maybe used
+          // by SparkSession
+          options.foreach { case (k, v) => sc.conf.set(k, v) }
+          if (!sc.conf.contains("spark.app.name")) {
+            sc.conf.setAppName(randomAppName)
+          }
+          sc
+        }
+        session = new CarbonSession(sparkContext)
+        options.foreach { case (k, v) => session.conf.set(k, v) }
+        SparkSession.setDefaultSession(session)
+
+        // Register a successfully instantiated context to the singleton. This should be at the
+        // end of the class definition so that the singleton is updated only if there is no
+        // exception in the construction of the instance.
+        sparkContext.addSparkListener(new SparkListener {
+          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+            SparkSession.setDefaultSession(null)
+            SparkSession.sqlListener.set(null)
+          }
+        })
+      }
+
+      return session
+    }
+
+    /**
+     * It is a hack to get the private field from class.
+     */
+    def getValue(name: String, builder: Builder): Any = {
+      val currentMirror = scala.reflect.runtime.currentMirror
+      val instanceMirror = currentMirror.reflect(builder)
+      val m = currentMirror.classSymbol(builder.getClass).
+        toType.members.find { p =>
+        p.name.toString.equals(name)
+      }.get.asTerm
+      instanceMirror.reflectField(m).get
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index b639ea8..c78ddf3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -48,7 +48,7 @@ class CarbonSource extends CreatableRelationProvider
                                mode: SaveMode,
                                parameters: Map[String, String],
                                data: DataFrame): BaseRelation = {
-    CarbonEnv.init(sqlContext)
+    CarbonEnv.init(sqlContext.sparkSession)
     // User should not specify path since only one store is supported in carbon currently,
     // after we support multi-store, we can remove this limitation
     require(!parameters.contains("path"), "'path' should not be specified, " +
@@ -88,7 +88,7 @@ class CarbonSource extends CreatableRelationProvider
                                sqlContext: SQLContext,
                                parameters: Map[String, String],
                                dataSchema: StructType): BaseRelation = {
-    CarbonEnv.init(sqlContext)
+    CarbonEnv.init(sqlContext.sparkSession)
     addLateDecodeOptimization(sqlContext.sparkSession)
     val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
     CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), parameters,
@@ -97,8 +97,10 @@ class CarbonSource extends CreatableRelationProvider
   }
 
   private def addLateDecodeOptimization(ss: SparkSession): Unit = {
-    ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
-    ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+    if (ss.sessionState.experimentalMethods.extraStrategies.isEmpty) {
+      ss.sessionState.experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy)
+      ss.sessionState.experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+    }
   }
 
   private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
@@ -131,7 +133,7 @@ class CarbonSource extends CreatableRelationProvider
         val map = scala.collection.mutable.Map[String, String]();
         parameters.foreach { x => map.put(x._1, x._2) }
         val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
-        CreateTable(cm).run(sparkSession)
+        CreateTable(cm, false).run(sparkSession)
         CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 5508a94..51b79c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.CatalystTypeConverters._
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -31,8 +30,7 @@ import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.sql.types.{AtomicType, IntegerType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
new file mode 100644
index 0000000..5211f1a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{CarbonEnv, ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Carbon strategies for ddl commands
+ */
+class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
+
+  def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    plan match {
+      case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
+        if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) =>
+        ExecutedCommandExec(LoadTable(identifier.database, identifier.table, path, Seq(),
+          Map(), isOverwrite)) :: Nil
+      case DropTableCommand(identifier, ifNotExists, isView)
+        if CarbonEnv.get.carbonMetastore
+          .isTablePathExists(identifier)(sparkSession) =>
+        ExecutedCommandExec(
+          CarbonDropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
+      case ShowLoadsCommand(databaseName, table, limit) =>
+        ExecutedCommandExec(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
+      case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
+        CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName)
+        ExecutedCommandExec(createDb) :: Nil
+      case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
+        if (isCascade) {
+          val tablesInDB = CarbonEnv.get.carbonMetastore.getAllTables()
+            .filterNot(_.database.exists(_.equalsIgnoreCase(dbName)))
+          tablesInDB.foreach{tableName =>
+            CarbonDropTableCommand(true, Some(dbName), tableName.table).run(sparkSession)
+          }
+        }
+        CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName)
+        ExecutedCommandExec(drop) :: Nil
+      case alterTable@AlterTableCompaction(altertablemodel) =>
+        val isCarbonTable = CarbonEnv.get.carbonMetastore
+          .tableExists(TableIdentifier(altertablemodel.tableName,
+            altertablemodel.dbName))(sparkSession)
+        if (isCarbonTable) {
+          if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
+              altertablemodel.compactionType.equalsIgnoreCase("major")) {
+            ExecutedCommandExec(alterTable) :: Nil
+          } else {
+            throw new MalformedCarbonCommandException(
+              "Unsupported alter operation on carbon table")
+          }
+        } else {
+          throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+        }
+      case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
+        if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) && isFormatted =>
+        val resolvedTable =
+          sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
+        val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
+        ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
+      case _ => Nil
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 68ad4d6..8f97961 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -17,18 +17,27 @@
 
 package org.apache.spark.sql.execution.command
 
+import java.io.File
+import java.text.SimpleDateFormat
+
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
+import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.util.FileUtils
+import org.codehaus.jackson.map.ObjectMapper
 
+import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
@@ -36,19 +45,34 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}
 
+object Checker {
+  def validateTableExists(
+      dbName: Option[String],
+      tableName: String,
+      session: SparkSession): Unit = {
+    val identifier = TableIdentifier(tableName, dbName)
+    if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(session)) {
+      val err = s"table $dbName.$tableName not found"
+      LogServiceFactory.getLogService(this.getClass.getName).error(err)
+      throw new IllegalArgumentException(err)
+    }
+  }
+}
+
 /**
  * Command for the compaction in alter table command
  *
  * @param alterTableModel
  */
-case class AlterTableCompaction(alterTableModel: AlterTableModel) {
+case class AlterTableCompaction(alterTableModel: AlterTableModel) extends RunnableCommand {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
@@ -109,24 +133,59 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) {
   }
 }
 
-case class CreateTable(cm: TableModel) {
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
-    cm.databaseName = cm.databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    CarbonEnv.init(sparkSession)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
     val tbName = cm.tableName
     val dbName = cm.databaseName
     LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
 
-    val tableInfo: TableInfo = TableNewProcessor(cm, sparkSession.sqlContext)
+    val tableInfo: TableInfo = TableNewProcessor(cm)
 
     if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
       sys.error("No Dimensions found. Table should have at least one dimesnion !")
     }
-    // Add Database to catalog and persist
-    val catalog = CarbonEnv.get.carbonMetastore
-    val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
-    LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+
+    if (sparkSession.sessionState.catalog.listTables(dbName)
+      .exists(_.table.equalsIgnoreCase(tbName))) {
+      if (!cm.ifNotExistsSet) {
+        LOGGER.audit(
+          s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
+          s"Table [$tbName] already exists under database [$dbName]")
+        sys.error(s"Table [$tbName] already exists under database [$dbName]")
+      }
+    } else {
+      // Add Database to catalog and persist
+      val catalog = CarbonEnv.get.carbonMetastore
+      // Need to fill partitioner class when we support partition
+      val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+      if (createDSTable) {
+        try {
+          sparkSession.sql(
+            s"""CREATE TABLE $dbName.$tbName
+                |(${(cm.dimCols ++ cm.msrCols).map(f => f.rawSchema).mkString(",")})
+                |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+            s""" OPTIONS (tableName "$tbName", dbName "${dbName}", tablePath "$tablePath") """)
+        } catch {
+          case e: Exception =>
+            val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
+            // call the drop table to delete the created table.
+
+            CarbonEnv.get.carbonMetastore
+              .dropTable(catalog.storePath, identifier)(sparkSession)
+
+            LOGGER.audit(s"Table creation with Database name [$dbName] " +
+                         s"and Table name [$tbName] failed")
+            throw e
+        }
+      }
+
+      LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+    }
+
     Seq.empty
   }
 
@@ -136,6 +195,54 @@ case class CreateTable(cm: TableModel) {
   }
 }
 
+case class DeleteLoadsById(
+    loadids: Seq[String],
+    databaseNameOp: Option[String],
+    tableName: String) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    CarbonStore.deleteLoadById(
+      loadids,
+      getDB.getDatabaseName(databaseNameOp, sparkSession),
+      tableName
+    )
+    Seq.empty
+
+  }
+
+  // validates load ids
+  private def validateLoadIds: Unit = {
+    if (loadids.isEmpty) {
+      val errorMessage = "Error: Segment id(s) should not be empty."
+      throw new MalformedCarbonCommandException(errorMessage)
+
+    }
+  }
+}
+
+case class DeleteLoadsByLoadDate(
+    databaseNameOp: Option[String],
+    tableName: String,
+    dateField: String,
+    loadDate: String) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.TableModel.tableSchema")
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    CarbonStore.deleteLoadByDate(
+      loadDate,
+      getDB.getDatabaseName(databaseNameOp, sparkSession),
+      tableName
+    )
+    Seq.empty
+  }
+
+}
 
 object LoadTable {
 
@@ -205,7 +312,7 @@ case class LoadTable(
     options: scala.collection.immutable.Map[String, String],
     isOverwriteExist: Boolean = false,
     var inputSqlString: String = null,
-    dataFrame: Option[DataFrame] = None) {
+    dataFrame: Option[DataFrame] = None) extends RunnableCommand {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -433,3 +540,191 @@ case class LoadTable(
     }
   }
 }
+
+private[sql] case class DeleteLoadByDate(
+    databaseNameOp: Option[String],
+    tableName: String,
+    dateField: String,
+    loadDate: String) {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    CarbonStore.deleteLoadByDate(
+      loadDate,
+      getDB.getDatabaseName(databaseNameOp, sparkSession),
+      tableName
+    )
+    Seq.empty
+  }
+
+}
+
+case class CleanFiles(
+    databaseNameOp: Option[String],
+    tableName: String) extends RunnableCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
+    CarbonStore.cleanFiles(
+      getDB.getDatabaseName(databaseNameOp, sparkSession),
+      tableName,
+      relation.asInstanceOf[CarbonRelation].tableMeta.storePath
+    )
+    Seq.empty
+  }
+}
+
+case class ShowLoads(
+    databaseNameOp: Option[String],
+    tableName: String,
+    limit: Option[String],
+    override val output: Seq[Attribute]) extends RunnableCommand {
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    CarbonStore.showSegments(
+      getDB.getDatabaseName(databaseNameOp, sparkSession),
+      tableName,
+      limit
+    )
+  }
+}
+
+case class CarbonDropTableCommand(ifExistsSet: Boolean,
+    databaseNameOp: Option[String],
+    tableName: String)
+  extends RunnableCommand {
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
+    val carbonLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTableIdentifier, LockUsage.DROP_TABLE_LOCK)
+    val storePath = CarbonEnv.get.carbonMetastore.storePath
+    var isLocked = false
+    try {
+      isLocked = carbonLock.lockWithRetries()
+      if (isLocked) {
+        logInfo("Successfully able to get the lock for drop.")
+      }
+      else {
+        LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked")
+        sys.error("Table is locked for deletion. Please try after some time")
+      }
+      LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
+      CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sparkSession)
+      LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+    } finally {
+      if (carbonLock != null && isLocked) {
+        if (carbonLock.unlock()) {
+          logInfo("Table MetaData Unlocked Successfully after dropping the table")
+          // deleting any remaining files.
+          val metadataFilePath = CarbonStorePath
+            .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
+          val fileType = FileFactory.getFileType(metadataFilePath)
+          if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+            val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+            CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
+          }
+          // delete bad record log after drop table
+          val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
+          val badLogFileType = FileFactory.getFileType(badLogPath)
+          if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
+            val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
+            CarbonUtil.deleteFoldersAndFiles(file)
+          }
+        } else {
+          logError("Unable to unlock Table MetaData")
+        }
+      }
+    }
+    Seq.empty
+  }
+}
+
+private[sql] case class DescribeCommandFormatted(
+    child: SparkPlan,
+    override val output: Seq[Attribute],
+    tblIdentifier: TableIdentifier)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val relation = CarbonEnv.get.carbonMetastore
+      .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+    val mapper = new ObjectMapper()
+    val colProps = StringBuilder.newBuilder
+    var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
+      val comment = if (relation.metaData.dims.contains(field.name)) {
+        val dimension = relation.metaData.carbonTable.getDimensionByName(
+          relation.tableMeta.carbonTableIdentifier.getTableName,
+          field.name)
+        if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
+          val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
+          colProps.append(field.name).append(".")
+            .append(mapper.writeValueAsString(dimension.getColumnProperties))
+            .append(",")
+        }
+        if (dimension.hasEncoding(Encoding.DICTIONARY) &&
+            !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          "DICTIONARY, KEY COLUMN"
+        } else {
+          "KEY COLUMN"
+        }
+      } else {
+        ("MEASURE")
+      }
+      (field.name, field.dataType.simpleString, comment)
+    }
+    val colPropStr = if (colProps.toString().trim().length() > 0) {
+      // drops additional comma at end
+      colProps.toString().dropRight(1)
+    } else {
+      colProps.toString()
+    }
+    results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
+    results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
+      .getDatabaseName, "")
+    )
+    results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
+    results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
+    val carbonTable = relation.tableMeta.carbonTable
+    results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
+    results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
+    if (colPropStr.length() > 0) {
+      results ++= Seq((colPropStr, "", ""))
+    } else {
+      results ++= Seq(("ADAPTIVE", "", ""))
+    }
+    val dimension = carbonTable
+      .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+    results ++= getColumnGroups(dimension.asScala.toList)
+    results.map { case (name, dataType, comment) =>
+      Row(f"$name%-36s $dataType%-80s $comment%-72s")
+    }
+  }
+
+  private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
+    var results: Seq[(String, String, String)] =
+      Seq(("", "", ""), ("##Column Group Information", "", ""))
+    val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
+      case (groupId, _) => groupId != -1
+    }.toSeq.sortBy(_._1)
+    val groups = groupedDimensions.map(colGroups => {
+      colGroups._2.map(dim => dim.getColName).mkString(", ")
+    })
+    var index = 1
+    groups.foreach { x =>
+      results = results :+ (s"Column Group $index", x, "")
+      index = index + 1
+    }
+    results
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 9638b8f..f174126 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -187,6 +187,15 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
     tables.nonEmpty
   }
 
+  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+    checkSchemasModifiedTimeAndReloadTables()
+    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+    val tables = metadata.tablesMeta.filter(
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+    tables.nonEmpty
+  }
+
   def loadMetadata(metadataPath: String, queryId: String): MetaData = {
     val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
     val statistic = new QueryStatistic()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..066acce
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
+import org.apache.spark.sql.execution.command.DDLStrategy
+import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ * @param sparkSession
+ */
+class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
+
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf)
+
+  experimentalMethods.extraStrategies =
+    Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
+  experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
new file mode 100644
index 0000000..028286c
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+
+import scala.language.implicitConversions
+
+import org.apache.spark.sql.ShowLoadsCommand
+import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command._
+
+/**
+ * TODO remove the duplicate code and add the common methods to common class.
+ * Parser for All Carbon DDL, DML cases in Unified context
+ */
+class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
+
+  override def parse(input: String): LogicalPlan = {
+    synchronized {
+      // Initialize the Keywords.
+      initLexical
+      phrase(start)(new lexical.Scanner(input)) match {
+        case Success(plan, _) => plan match {
+          case x: LoadTable =>
+            x.inputSqlString = input
+            x
+          case logicalPlan => logicalPlan
+        }
+        case failureOrError => sys.error(failureOrError.toString)
+      }
+    }
+  }
+
+
+  protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
+
+  protected lazy val startCommand: Parser[LogicalPlan] =
+    loadManagement| showLoads | alterTable
+
+  protected lazy val loadManagement: Parser[LogicalPlan] =
+    deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
+
+
+  protected lazy val alterTable: Parser[LogicalPlan] =
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";")  ^^ {
+      case dbName ~ table ~ (compact ~ compactType) =>
+        val altertablemodel = AlterTableModel(dbName, table, compactType, null)
+        AlterTableCompaction(altertablemodel)
+    }
+
+
+  protected lazy val loadDataNew: Parser[LogicalPlan] =
+    LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
+    (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
+    (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+      case filePath ~ isOverwrite ~ table ~ optionsList =>
+        val (databaseNameOp, tableName) = table match {
+          case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
+        }
+        if (optionsList.isDefined) {
+          validateOptions(optionsList)
+        }
+        val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
+        LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
+          isOverwrite.isDefined)
+    }
+
+  protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
+    DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
+                                                   (ident <~ ".").? ~ ident) <~
+    opt(";") ^^ {
+      case loadids ~ table => table match {
+        case databaseName ~ tableName =>
+          DeleteLoadsById(loadids, databaseName, tableName.toLowerCase())
+      }
+    }
+
+  protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
+    DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
+    (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
+    opt(";") ^^ {
+      case schema ~ table ~ condition =>
+        condition match {
+          case dateField ~ dateValue =>
+            DeleteLoadsByLoadDate(schema, table.toLowerCase(), dateField, dateValue)
+        }
+    }
+
+  protected lazy val cleanFiles: Parser[LogicalPlan] =
+    CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
+      case databaseName ~ tableName => CleanFiles(databaseName, tableName.toLowerCase())
+    }
+
+  protected lazy val explainPlan: Parser[LogicalPlan] =
+    (EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
+      case isExtended ~ logicalPlan =>
+        logicalPlan match {
+          case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
+          case _ => ExplainCommand(OneRowRelation)
+        }
+    }
+
+  protected lazy val showLoads: Parser[LogicalPlan] =
+    SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
+    (LIMIT ~> numericLit).? <~
+    opt(";") ^^ {
+      case databaseName ~ tableName ~ limit =>
+        ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
+    }
+}