You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:09 UTC

[13/50] [abbrv] carbondata git commit: [CARBONDATA-1884] Add CTAS support to carbondata

[CARBONDATA-1884] Add CTAS support to carbondata

Implemented CTAS feature in carbondata

This closes #1665


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/59de7cdb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/59de7cdb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/59de7cdb

Branch: refs/heads/fgdatamap
Commit: 59de7cdbd234d921af1a4f535d240943778a775d
Parents: 54eedfe
Author: manishgupta88 <to...@gmail.com>
Authored: Wed Dec 13 21:40:19 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Dec 19 14:43:57 2017 +0800

----------------------------------------------------------------------
 .../createTable/TestCreateTableAsSelect.scala   | 161 +++++++++++++++++++
 .../TestDataWithDicExcludeAndInclude.scala      |  11 --
 .../spark/util/CarbonReflectionUtils.scala      |   2 +-
 .../CarbonCreateTableAsSelectCommand.scala      | 113 +++++++++++++
 .../spark/sql/hive/CarbonFileMetastore.scala    |  44 +++++
 .../apache/spark/sql/hive/CarbonMetaStore.scala |  28 +++-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  56 +++++--
 .../src/main/spark2.1/CarbonSessionState.scala  |  26 +--
 .../src/main/spark2.2/CarbonSessionState.scala  |  26 +--
 9 files changed, 417 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
new file mode 100644
index 0000000..ffe6261
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.test.Spark2TestQueryExecutor
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
+/**
+ * test functionality for create table as select command
+ */
+class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
+
+  private def createTablesAndInsertData {
+    // create carbon table and insert data
+    sql("CREATE TABLE carbon_ctas_test(key INT, value STRING) STORED by 'carbondata'")
+    sql("insert into carbon_ctas_test select 100,'spark'")
+    sql("insert into carbon_ctas_test select 200,'hive'")
+
+    // create parquet table and insert data
+    sql("CREATE TABLE parquet_ctas_test(key INT, value STRING) STORED as parquet")
+    sql("insert into parquet_ctas_test select 100,'spark'")
+    sql("insert into parquet_ctas_test select 200,'hive'")
+
+    // create hive table and insert data
+    sql("CREATE TABLE orc_ctas_test(key INT, value STRING) STORED as ORC")
+    sql("insert into orc_ctas_test select 100,'spark'")
+    sql("insert into orc_ctas_test select 200,'hive'")
+  }
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS carbon_ctas_test")
+    sql("DROP TABLE IF EXISTS parquet_ctas_test")
+    sql("DROP TABLE IF EXISTS orc_ctas_test")
+    createTablesAndInsertData
+  }
+
+  test("test create table as select with select from same table name when table exists") {
+    sql("drop table if exists ctas_same_table_name")
+    sql("CREATE TABLE ctas_same_table_name(key INT, value STRING) STORED by 'carbondata'")
+    intercept[Exception] {
+      sql("create table ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name")
+    }
+  }
+
+  test("test create table as select with select from same table name when table does not exists") {
+    sql("drop table if exists ctas_same_table_name")
+    intercept[Exception] {
+      sql("create table ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name")
+    }
+  }
+
+  test("test create table as select with select from same table name with if not exists clause") {
+    sql("drop table if exists ctas_same_table_name")
+    sql("CREATE TABLE ctas_same_table_name(key INT, value STRING) STORED by 'carbondata'")
+    sql("create table if not exists ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name")
+    assert(true)
+  }
+
+  test("test create table as select with select from another carbon table") {
+    sql("DROP TABLE IF EXISTS ctas_select_carbon")
+    sql("create table ctas_select_carbon stored by 'carbondata' as select * from carbon_ctas_test")
+    checkAnswer(sql("select * from ctas_select_carbon"), sql("select * from carbon_ctas_test"))
+  }
+
+  test("test create table as select with select from another parquet table") {
+    sql("DROP TABLE IF EXISTS ctas_select_parquet")
+    sql("create table ctas_select_parquet stored by 'carbondata' as select * from parquet_ctas_test")
+    checkAnswer(sql("select * from ctas_select_parquet"), sql("select * from parquet_ctas_test"))
+  }
+
+  test("test create table as select with select from another hive/orc table") {
+    sql("DROP TABLE IF EXISTS ctas_select_orc")
+    sql("create table ctas_select_orc stored by 'carbondata' as select * from orc_ctas_test")
+    checkAnswer(sql("select * from ctas_select_orc"), sql("select * from orc_ctas_test"))
+  }
+
+  test("test create table as select with where clause in select from carbon table that returns data") {
+    sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
+    sql("create table ctas_select_where_carbon stored by 'carbondata' as select * from carbon_ctas_test where key=100")
+    checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test where key=100"))
+  }
+
+  test(
+    "test create table as select with where clause in select from carbon table that does not return data") {
+    sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
+    sql("create table ctas_select_where_carbon stored by 'carbondata' as select * from carbon_ctas_test where key=300")
+    checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test where key=300"))
+  }
+
+  test("test create table as select with where clause in select from carbon table and load again") {
+    sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
+    sql("create table ctas_select_where_carbon stored by 'carbondata' as select * from carbon_ctas_test where key=100")
+    sql("insert into ctas_select_where_carbon select 200,'hive'")
+    checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test"))
+  }
+
+  test("test create table as select with where clause in select from parquet table") {
+    sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
+    sql("create table ctas_select_where_parquet stored by 'carbondata' as select * from parquet_ctas_test where key=100")
+    checkAnswer(sql("select * from ctas_select_where_parquet"), sql("select * from parquet_ctas_test where key=100"))
+  }
+
+  test("test create table as select with where clause in select from hive/orc table") {
+    sql("DROP TABLE IF EXISTS ctas_select_where_orc")
+    sql("create table ctas_select_where_orc stored by 'carbondata' as select * from orc_ctas_test where key=100")
+    checkAnswer(sql("select * from ctas_select_where_orc"), sql("select * from orc_ctas_test where key=100"))
+  }
+
+  test("test create table as select with select directly having the data") {
+    sql("DROP TABLE IF EXISTS ctas_select_direct_data")
+    sql("create table ctas_select_direct_data stored by 'carbondata' as select 300,'carbondata'")
+    checkAnswer(sql("select * from ctas_select_direct_data"), Seq(Row(300,"carbondata")))
+  }
+
+  test("test create table as select with TBLPROPERTIES") {
+    sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+    sql(
+      "create table ctas_tblproperties_test stored by 'carbondata' TBLPROPERTIES" +
+      "('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test")
+    checkAnswer(sql("select * from ctas_tblproperties_test"), sql("select * from carbon_ctas_test"))
+    val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore
+      .lookupRelation(Option("default"), "ctas_tblproperties_test")(Spark2TestQueryExecutor.spark)
+      .asInstanceOf[CarbonRelation].carbonTable
+    val metadataFolderPath: CarbonFile = FileFactory.getCarbonFile(carbonTable.getMetaDataFilepath)
+    assert(metadataFolderPath.exists())
+    val dictFiles: Array[CarbonFile] = metadataFolderPath.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.contains(".dict") || file.getName.contains(".sortindex")
+      }
+    })
+    assert(dictFiles.length == 3)
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS carbon_ctas_test")
+    sql("DROP TABLE IF EXISTS parquet_ctas_test")
+    sql("DROP TABLE IF EXISTS orc_ctas_test")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
index 484c304..c788857 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
@@ -100,17 +100,6 @@ class TestLoadDataWithDictionaryExcludeAndInclude extends QueryTest with BeforeA
     ).message.contains("Operation not allowed: CREATE EXTERNAL TABLE"))
   }
 
-  test("test CTAS should fail") {
-    assert(intercept[AnalysisException](
-      sql(
-        """
-          | CREATE TABLE t1 (id string, value int)
-          | STORED BY 'carbondata'
-          | AS SELECT 'ABC', 1 FROM t2
-        """.stripMargin)
-    ).message.contains("Operation not allowed: CREATE TABLE AS SELECT"))
-  }
-
   override def afterAll {
     dropTable
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index ba51077..19b967a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -189,7 +189,7 @@ object CarbonReflectionUtils {
       createObject(
         "org.apache.spark.sql.hive.CarbonSqlAstBuilder",
         conf,
-        sqlParser)._1.asInstanceOf[AstBuilder]
+        sqlParser, sparkSession)._1.asInstanceOf[AstBuilder]
     } else {
       throw new UnsupportedOperationException("Spark version not supported")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
new file mode 100644
index 0000000..26a8f6f
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.table
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.AtomicRunnableCommand
+import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableInfo the Table Describe, which may contains serde, storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param ifNotExistsSet allow continue working if it's already exists, otherwise
+ *                      raise exception
+ * @param tableLocation store location where the table need to be created
+ */
+case class CarbonCreateTableAsSelectCommand(
+    tableInfo: TableInfo,
+    query: LogicalPlan,
+    ifNotExistsSet: Boolean = false,
+    tableLocation: Option[String] = None) extends AtomicRunnableCommand {
+
+  /**
+   * variable to be used for insert into command for checking whether the
+   * table is created newly or already existed
+   */
+  var isTableCreated: Boolean = false
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val tableName = tableInfo.getFactTable.getTableName
+    var databaseOpt: Option[String] = None
+    if (tableInfo.getDatabaseName != null) {
+      databaseOpt = Some(tableInfo.getDatabaseName)
+    }
+    val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+    LOGGER.audit(s"Request received for CTAS for $dbName.$tableName")
+    // check if table already exists
+    if (sparkSession.sessionState.catalog.listTables(dbName)
+      .exists(_.table.equalsIgnoreCase(tableName))) {
+      if (!ifNotExistsSet) {
+        LOGGER.audit(
+          s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
+          s"Table [$tableName] already exists under database [$dbName]")
+        throw new TableAlreadyExistsException(dbName, tableName)
+      }
+    } else {
+      // execute command to create carbon table
+      CarbonCreateTableCommand(tableInfo, ifNotExistsSet, tableLocation).run(sparkSession)
+      isTableCreated = true
+    }
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isTableCreated) {
+      val tableName = tableInfo.getFactTable.getTableName
+      var databaseOpt: Option[String] = None
+      if (tableInfo.getDatabaseName != null) {
+        databaseOpt = Some(tableInfo.getDatabaseName)
+      }
+      val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+      val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+      val carbonDataSourceHadoopRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .createCarbonDataSourceHadoopRelation(sparkSession,
+          TableIdentifier(tableName, Option(dbName)))
+      // execute command to load data into carbon table
+      CarbonInsertIntoCommand(
+        carbonDataSourceHadoopRelation,
+        query,
+        overwrite = false,
+        partition = Map.empty).run(sparkSession)
+      LOGGER.audit(s"CTAS operation completed successfully for $dbName.$tableName")
+    }
+    Seq.empty
+  }
+
+  override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
+    val tableName = tableInfo.getFactTable.getTableName
+    var databaseOpt: Option[String] = None
+    if (tableInfo.getDatabaseName != null) {
+      databaseOpt = Some(tableInfo.getDatabaseName)
+    }
+    val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+    // drop the created table.
+    CarbonDropTableCommand(
+      ifExistsSet = false,
+      Option(dbName), tableName).run(sparkSession)
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index f7a1eed..ba222e2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive
 
+import java.net.URI
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicLong
 
@@ -528,4 +529,47 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val tableMetadataFile = tablePath.getSchemaFilePath
     CarbonUtil.readSchemaFile(tableMetadataFile)
   }
+
+  override def createCarbonDataSourceHadoopRelation(
+      sparkSession: SparkSession,
+      tableIdentifier: TableIdentifier): CarbonDatasourceHadoopRelation = {
+    val relation: LogicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier)
+    relation match {
+      case SubqueryAlias(_,
+      LogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
+        carbonDataSourceHadoopRelation
+      case LogicalRelation(
+      carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+        carbonDataSourceHadoopRelation
+      case SubqueryAlias(_, c)
+        if SPARK_VERSION.startsWith("2.2") &&
+           (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+            c.getClass.getName
+              .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+            c.getClass.getName.equals(
+              "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
+        val catalogTable =
+          CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable]
+        catalogTable.provider match {
+          case Some(name) if name.equals("org.apache.spark.sql.CarbonSource") => name
+          case _ =>
+            throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table)
+        }
+        val tableLocation = catalogTable.storage.locationUri match {
+          case tableLoc@Some(uri) =>
+            if (tableLoc.get.isInstanceOf[String]) {
+              FileFactory.getUpdatedFilePath(tableLoc.get.asInstanceOf[String])
+            } else if (tableLoc.get.isInstanceOf[URI]) {
+              FileFactory.getUpdatedFilePath(tableLoc.get.asInstanceOf[URI].getPath)
+            }
+          case None =>
+            CarbonEnv.getTablePath(tableIdentifier.database, tableIdentifier.table)(sparkSession)
+        }
+        CarbonDatasourceHadoopRelation(sparkSession,
+          Array(tableLocation.asInstanceOf[String]),
+          catalogTable.storage.properties,
+          Option(catalogTable.schema))
+      case _ => throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 5a8e58b..cc0e6ab 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -16,9 +16,10 @@
  */
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, DataFrame, Dataset, RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
@@ -144,6 +145,31 @@ trait CarbonMetaStore {
   def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
 
   def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
+
+  /**
+   * Method will be used to retrieve or create carbon data source relation
+   *
+   * @param sparkSession
+   * @param tableIdentifier
+   * @return
+   */
+  def createCarbonDataSourceHadoopRelation(
+      sparkSession: SparkSession,
+      tableIdentifier: TableIdentifier): CarbonDatasourceHadoopRelation
+
+  /**
+   * Method will be used retrieve the schema from unresolved relation
+   *
+   * @param sparkSession
+   * @param query
+   * @return
+   */
+  def getSchemaFromUnresolvedRelation(
+      sparkSession: SparkSession,
+      query: LogicalPlan): StructType = {
+    val df: DataFrame = Dataset.ofRows(sparkSession, query)
+    df.schema
+  }
 }
 /**
  * Factory for Carbon metastore

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 79095ca..3597208 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -26,14 +26,12 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
 import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, TableNewProcessor}
-import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -78,7 +76,9 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
   }
 }
 
-class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
+class CarbonHelperSqlAstBuilder(conf: SQLConf,
+    parser: CarbonSpark2SqlParser,
+    sparkSession: SparkSession)
   extends SparkSqlAstBuilder(conf) {
 
   def getFileStorage(createFileFormat: CreateFileFormatContext): String = {
@@ -147,7 +147,8 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       tablePropertyList : TablePropertyListContext,
       locationSpecContext: SqlBaseParser.LocationSpecContext,
       tableComment : Option[String],
-      ctas: TerminalNode) : LogicalPlan = {
+      ctas: TerminalNode,
+      query: QueryContext) : LogicalPlan = {
     // val parser = new CarbonSpark2SqlParser
 
     val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
@@ -166,9 +167,6 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
     if (external) {
       operationNotAllowed("CREATE EXTERNAL TABLE", tableHeader)
     }
-    if (ctas != null && columns != null) {
-      operationNotAllowed("CREATE TABLE AS SELECT", tableHeader)
-    }
 
     val cols = Option(columns).toSeq.flatMap(visitColTypeList)
     val properties = getPropertyKeyValues(tablePropertyList)
@@ -210,13 +208,33 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       }
     }
 
-    val fields = parser.getFields(cols ++ partitionByStructFields)
     val options = new CarbonOption(properties)
+    // validate streaming property
+    validateStreamingProperty(options)
+    var fields = parser.getFields(cols ++ partitionByStructFields)
+    // validate for create table as select
+    val selectQuery = Option(query).map(plan)
+    selectQuery match {
+      case Some(q) =>
+        // create table as select does not allow creation of partitioned table
+        if (partitionFields.nonEmpty) {
+          val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
+                             "create a partitioned table using Carbondata file formats."
+          operationNotAllowed(errorMessage, partitionColumns)
+        }
+        // create table as select does not allow to explicitly specify schema
+        if (fields.nonEmpty) {
+          operationNotAllowed(
+            "Schema may not be specified in a Create Table As Select (CTAS) statement", columns)
+        }
+        fields = parser
+          .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
+            .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
+      case _ =>
+        // ignore this case
+    }
     // validate tblProperties
     val bucketFields = parser.getBucketFields(tableProperties, fields, options)
-
-    validateStreamingProperty(options)
-
     // prepare table model of the collected tokens
     val tableModel: TableModel = parser.prepareTableModel(
       ifNotExists,
@@ -228,7 +246,19 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       bucketFields,
       isAlterFlow = false,
       tableComment)
-    CarbonCreateTableCommand(TableNewProcessor(tableModel), tableModel.ifNotExistsSet, tablePath)
+
+    selectQuery match {
+      case query@Some(q) =>
+        CarbonCreateTableAsSelectCommand(
+          TableNewProcessor(tableModel),
+          query.get,
+          tableModel.ifNotExistsSet,
+          tablePath)
+      case _ =>
+        CarbonCreateTableCommand(TableNewProcessor(tableModel),
+          tableModel.ifNotExistsSet,
+          tablePath)
+    }
   }
 
   private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index c198613..b55a6aa 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -259,25 +259,27 @@ object CarbonOptimizerUtil {
   }
 }
 
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
-  SparkSqlAstBuilder(conf) {
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) {
 
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser)
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
 
   override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
     val fileStorage = helper.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      helper.createCarbonTable(ctx.createTableHeader,
-          ctx.skewSpec,
-          ctx.bucketSpec,
-          ctx.partitionColumns,
-          ctx.columns,
-          ctx.tablePropertyList,
-          ctx.locationSpec,
-          Option(ctx.STRING()).map(string),
-          ctx.AS)
+      helper.createCarbonTable(
+        tableHeader = ctx.createTableHeader,
+        skewSpecContext = ctx.skewSpec,
+        bucketSpecContext = ctx.bucketSpec,
+        partitionColumns = ctx.partitionColumns,
+        columns = ctx.columns,
+        tablePropertyList = ctx.tablePropertyList,
+        locationSpecContext = ctx.locationSpec(),
+        tableComment = Option(ctx.STRING()).map(string),
+        ctas = ctx.AS,
+        query = ctx.query)
     } else {
       super.visitCreateTable(ctx)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index 8acddfa..c951e5e 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -280,25 +280,27 @@ class CarbonOptimizer(
   }
 }
 
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
-  SparkSqlAstBuilder(conf) {
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) {
 
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser)
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
 
   override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
     val fileStorage = helper.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      helper.createCarbonTable(ctx.createTableHeader,
-          ctx.skewSpec,
-          ctx.bucketSpec,
-          ctx.partitionColumns,
-          ctx.columns,
-          ctx.tablePropertyList,
-          ctx.locationSpec(),
-          Option(ctx.STRING()).map(string),
-          ctx.AS)
+      helper.createCarbonTable(
+        tableHeader = ctx.createTableHeader,
+        skewSpecContext = ctx.skewSpec,
+        bucketSpecContext = ctx.bucketSpec,
+        partitionColumns = ctx.partitionColumns,
+        columns = ctx.columns,
+        tablePropertyList = ctx.tablePropertyList,
+        locationSpecContext = ctx.locationSpec(),
+        tableComment = Option(ctx.STRING()).map(string),
+        ctas = ctx.AS,
+        query = ctx.query)
     } else {
       super.visitCreateHiveTable(ctx)
     }