You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/03 08:45:42 UTC

carbondata git commit: [CARBONDATA-2396] Add CTAS support for using DataSource syntax

Repository: carbondata
Updated Branches:
  refs/heads/master f7c0670cd -> 167b6c004


[CARBONDATA-2396] Add CTAS support for using DataSource syntax

Implemented CTAS feature to support DataSource syntax ("using carbondata")

This closes #2225


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/167b6c00
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/167b6c00
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/167b6c00

Branch: refs/heads/master
Commit: 167b6c00402963c9cea278f6246fdd21e3a1987d
Parents: f7c0670
Author: Indhumathi27 <in...@gmail.com>
Authored: Tue Apr 24 20:43:19 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Thu May 3 14:18:08 2018 +0530

----------------------------------------------------------------------
 .../sql/commands/UsingCarbondataSuite.scala     |  74 +++++++-
 .../org/apache/spark/sql/CarbonSource.scala     |  38 +++--
 .../sql/execution/strategy/DDLStrategy.scala    |  21 ++-
 ...CreateCarbonSourceTableAsSelectCommand.scala | 167 +++++++++++++++++++
 ...CreateCarbonSourceTableAsSelectCommand.scala | 122 ++++++++++++++
 5 files changed, 406 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/167b6c00/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
index 37d65b4..097c9d9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.sql.commands
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
 
@@ -28,11 +28,15 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach {
   override def beforeEach(): Unit = {
     sql("DROP TABLE IF EXISTS src_carbondata1")
+    sql("DROP TABLE IF EXISTS src_carbondata3")
+    sql("DROP TABLE IF EXISTS src_carbondata4")
     sql("DROP TABLE IF EXISTS tableSize3")
   }
 
   override def afterEach(): Unit = {
     sql("DROP TABLE IF EXISTS src_carbondata1")
+    sql("DROP TABLE IF EXISTS src_carbondata3")
+    sql("DROP TABLE IF EXISTS src_carbondata4")
     sql("DROP TABLE IF EXISTS tableSize3")
   }
 
@@ -69,4 +73,72 @@ class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach {
     res3.foreach(row => assert(row.getString(1).trim.toLong > 0))
   }
 
+  test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata'") {
+    sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING carbondata")
+    sql("INSERT INTO src_carbondata3 VALUES(1,'source')")
+    checkAnswer(sql("SELECT * FROM src_carbondata3"), Row(1, "source"))
+    sql("CREATE TABLE src_carbondata4 USING carbondata as select * from src_carbondata3")
+    checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source"))
+  }
+
+  test("CARBONDATA-2396 Support Create Table As Select with 'USING org.apache.spark.sql.CarbonSource'") {
+    sql("DROP TABLE IF EXISTS src_carbondata3")
+    sql("DROP TABLE IF EXISTS src_carbondata4")
+    sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING org.apache.spark.sql.CarbonSource")
+    sql("INSERT INTO src_carbondata3 VALUES(1,'source')")
+    checkAnswer(sql("SELECT * FROM src_carbondata3"), Row(1, "source"))
+    sql("CREATE TABLE src_carbondata4 USING org.apache.spark.sql.CarbonSource as select * from src_carbondata3")
+    checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source"))
+    sql("DROP TABLE IF EXISTS src_carbondata3")
+    sql("DROP TABLE IF EXISTS src_carbondata4")
+  }
+
+  test("CARBONDATA-2396 Support Create Table As Select [IF NOT EXISTS] with 'using carbondata'") {
+    sql("DROP TABLE IF EXISTS src_carbondata5")
+    sql("DROP TABLE IF EXISTS src_carbondata6")
+    sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata")
+    sql("INSERT INTO src_carbondata5 VALUES(1,'source')")
+    checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source"))
+    sql(
+      "CREATE TABLE IF NOT EXISTS src_carbondata6 USING carbondata as select * from " +
+      "src_carbondata5")
+    checkAnswer(sql("SELECT * FROM src_carbondata6"), Row(1, "source"))
+    sql("DROP TABLE IF EXISTS src_carbondata5")
+    sql("DROP TABLE IF EXISTS src_carbondata6")
+  }
+
+  test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata' with Table properties") {
+    sql("DROP TABLE IF EXISTS src_carbondata5")
+    sql("DROP TABLE IF EXISTS src_carbondata6")
+    sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata")
+    sql("INSERT INTO src_carbondata5 VALUES(1,'source')")
+    checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source"))
+    sql("CREATE TABLE src_carbondata6  USING carbondata options('table_blocksize'='10'," +
+      "'sort_scope'='local_sort') as select * from src_carbondata5")
+    val result = sql("describe FORMATTED src_carbondata6")
+    checkExistence(result, true, "Table Block Size")
+    checkExistence(result, true, "10 MB")
+    checkAnswer(sql("SELECT * FROM src_carbondata6"), Row(1, "source"))
+    sql("DROP TABLE IF EXISTS src_carbondata5")
+    sql("DROP TABLE IF EXISTS src_carbondata6")
+  }
+
+  test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata' with Columns") {
+    sql("DROP TABLE IF EXISTS src_carbondata5")
+    sql("DROP TABLE IF EXISTS src_carbondata6")
+    sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata")
+    sql("INSERT INTO src_carbondata5 VALUES(1,'source')")
+    checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source"))
+    val exception = intercept[AnalysisException](
+      sql(
+        "CREATE TABLE src_carbondata6(name String) USING carbondata as select * from " +
+        "src_carbondata5"))
+    assert(exception.getMessage
+      .contains(
+        "Operation not allowed: Schema may not be specified in a Create Table As Select (CTAS) " +
+        "statement"))
+    sql("DROP TABLE IF EXISTS src_carbondata5")
+    sql("DROP TABLE IF EXISTS src_carbondata6")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167b6c00/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 3600854..8376136 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -26,6 +26,7 @@ import scala.language.implicitConversions
 import org.apache.commons.lang.StringUtils
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
 import org.apache.spark.sql.execution.streaming.Sink
@@ -87,12 +88,6 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       data: DataFrame): BaseRelation = {
     CarbonEnv.getInstance(sqlContext.sparkSession)
     var newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
-    // User should not specify path since only one store is supported in carbon currently,
-    // after we support multi-store, we can remove this limitation
-    require(!newParameters.contains("path"), "'path' should not be specified, " +
-                                          "the path to store carbon file is the 'storePath' " +
-                                          "specified when creating CarbonContext")
-
     val options = new CarbonOption(newParameters)
     val isExists = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.tableExists(
       options.tableName, options.dbName)(sqlContext.sparkSession)
@@ -181,7 +176,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
           dbName,
           tableName)
         val updatedParams = CarbonSource.updateAndCreateTable(
-          identifier, dataSchema, sparkSession, metaStore, parameters)
+          identifier, dataSchema, sparkSession, metaStore, parameters, None)
         (CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), updatedParams)
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)
@@ -273,12 +268,26 @@ object CarbonSource {
   def createTableInfoFromParams(
       parameters: Map[String, String],
       dataSchema: StructType,
-      identifier: AbsoluteTableIdentifier): TableModel = {
+      identifier: AbsoluteTableIdentifier,
+      query: Option[LogicalPlan],
+      sparkSession: SparkSession): TableModel = {
     val sqlParser = new CarbonSpark2SqlParser
-    val fields = sqlParser.getFields(dataSchema)
     val map = scala.collection.mutable.Map[String, String]()
     parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
     val options = new CarbonOption(parameters)
+    val fields = query match {
+      case Some(q) =>
+        // if query is provided then it is a CTAS flow
+        if (sqlParser.getFields(dataSchema).nonEmpty) {
+          throw new AnalysisException(
+            "Schema cannot be specified in a Create Table As Select (CTAS) statement")
+        }
+        sqlParser
+          .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
+            .getSchemaFromUnresolvedRelation(sparkSession, q))
+      case None =>
+        sqlParser.getFields(dataSchema)
+    }
     val bucketFields = sqlParser.getBucketFields(map, fields, options)
     sqlParser.prepareTableModel(ifNotExistPresent = false, Option(identifier.getDatabaseName),
       identifier.getTableName, fields, Nil, map, bucketFields)
@@ -292,7 +301,8 @@ object CarbonSource {
    */
   def updateCatalogTableWithCarbonSchema(
       tableDesc: CatalogTable,
-      sparkSession: SparkSession): CatalogTable = {
+      sparkSession: SparkSession,
+      query: Option[LogicalPlan] = None): CatalogTable = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val storageFormat = tableDesc.storage
     val properties = storageFormat.properties
@@ -306,7 +316,8 @@ object CarbonSource {
         tableDesc.schema,
         sparkSession,
         metaStore,
-        properties)
+        properties,
+        query)
       // updating params
       val updatedFormat = storageFormat.copy(properties = map)
       tableDesc.copy(storage = updatedFormat)
@@ -334,8 +345,9 @@ object CarbonSource {
       dataSchema: StructType,
       sparkSession: SparkSession,
       metaStore: CarbonMetaStore,
-      properties: Map[String, String]): Map[String, String] = {
-    val model = createTableInfoFromParams(properties, dataSchema, identifier)
+      properties: Map[String, String],
+      query: Option[LogicalPlan]): Map[String, String] = {
+    val model = createTableInfoFromParams(properties, dataSchema, identifier, query, sparkSession)
     val tableInfo: TableInfo = TableNewProcessor(model)
     val isExternal = properties.getOrElse("isExternal", "false")
     val isTransactionalTable = properties.getOrElse("isTransactional", "true")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167b6c00/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index d7e2023..ef4d05c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedComm
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
 import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand}
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
 
@@ -228,11 +228,28 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         val cmd =
           CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
+      case cmd@CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
+        if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
+           && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
+               || tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>
+        val updatedCatalog = CarbonSource
+          .updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, Option(query))
+        val cmd = CreateCarbonSourceTableAsSelectCommand(updatedCatalog, SaveMode.Ignore, query)
+        ExecutedCommandExec(cmd) :: Nil
+      case cmd@org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, query)
+        if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
+           && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
+               || tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>
+        val updatedCatalog = CarbonSource
+          .updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, query)
+        val cmd = CreateCarbonSourceTableAsSelectCommand(updatedCatalog, SaveMode.Ignore, query.get)
+        ExecutedCommandExec(cmd) :: Nil
       case CreateDataSourceTableCommand(table, ignoreIfExists)
         if table.provider.get != DDLUtils.HIVE_PROVIDER
           && (table.provider.get.equals("org.apache.spark.sql.CarbonSource")
           || table.provider.get.equalsIgnoreCase("carbondata")) =>
-        val updatedCatalog = CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession)
+        val updatedCatalog = CarbonSource
+          .updateCatalogTableWithCarbonSchema(table, sparkSession)
         val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
         ExecutedCommandExec(cmd) :: Nil
       case AlterTableSetPropertiesCommand(tableName, properties, isView)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167b6c00/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
new file mode 100644
index 0000000..989b1d5
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -0,0 +1,167 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{
+  CatalogRelation, CatalogTable, CatalogTableType,
+  SimpleCatalogRelation
+}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{
+  AlterTableRecoverPartitionsCommand, DDLUtils,
+  RunnableCommand
+}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Create table 'using carbondata' and insert the query result into it.
+ *
+ * @param table the Catalog Table
+ * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
+ * @param query the query whose result will be insert into the new relation
+ *
+ */
+
+case class CreateCarbonSourceTableAsSelectCommand(
+    table: CatalogTable,
+    mode: SaveMode,
+    query: LogicalPlan)
+  extends RunnableCommand {
+
+  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(table.tableType != CatalogTableType.VIEW)
+    assert(table.provider.isDefined)
+    assert(table.schema.isEmpty)
+
+    val provider = table.provider.get
+    val sessionState = sparkSession.sessionState
+    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+    val tableIdentWithDB = table.identifier.copy(database = Some(db))
+    val tableName = tableIdentWithDB.unquotedString
+
+    var createMetastoreTable = false
+    var existingSchema = Option.empty[StructType]
+    if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
+      // Check if we need to throw an exception or just return.
+      mode match {
+        case SaveMode.ErrorIfExists =>
+          throw new AnalysisException(s"Table $tableName already exists. " +
+                                      s"If you are using saveAsTable, you can set SaveMode to " +
+                                      s"SaveMode.Append to " +
+                                      s"insert data into the table or set SaveMode to SaveMode" +
+                                      s".Overwrite to overwrite" +
+                                      s"the existing data. " +
+                                      s"Or, if you are using SQL CREATE TABLE, you need to drop " +
+                                      s"$tableName first.")
+        case SaveMode.Ignore =>
+          // Since the table already exists and the save mode is Ignore, we will just return.
+          return Seq.empty[Row]
+        case SaveMode.Append =>
+          // Check if the specified data source match the data source of the existing table.
+          val existingProvider = DataSource.lookupDataSource(provider)
+          // TODO: Check that options from the resolved relation match the relation that we are
+          // inserting into (i.e. using the same compression).
+
+          // Pass a table identifier with database part, so that `lookupRelation` won't get temp
+          // views unexpectedly.
+          EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
+            case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
+              // check if the file formats match
+              l.relation match {
+                case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
+                  throw new AnalysisException(
+                    s"The file format of the existing table $tableName is " +
+                    s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " +
+                    s"format `$provider`")
+                case _ =>
+              }
+              if (query.schema.size != l.schema.size) {
+                throw new AnalysisException(
+                  s"The column number of the existing schema[${ l.schema }] " +
+                  s"doesn't match the data schema[${ query.schema }]'s")
+              }
+              existingSchema = Some(l.schema)
+            case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
+              existingSchema = Some(s.metadata.schema)
+            case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) =>
+              throw new AnalysisException("Saving data in the Hive serde table " +
+                                          s"${ c.catalogTable.identifier } is not supported yet. " +
+                                          s"Please use the insertInto() API as an alternative..")
+            case o =>
+              throw new AnalysisException(s"Saving data in ${ o.toString } is not supported.")
+          }
+        case SaveMode.Overwrite =>
+          sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
+          // Need to create the table again.
+          createMetastoreTable = true
+      }
+    } else {
+      // The table does not exist. We need to create it in metastore.
+      createMetastoreTable = true
+    }
+
+    val data = Dataset.ofRows(sparkSession, query)
+    val df = existingSchema match {
+      // If we are inserting into an existing table, just use the existing schema.
+      case Some(s) => data.selectExpr(s.fieldNames: _*)
+      case None => data
+    }
+
+    val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+      Some(sessionState.catalog.defaultTablePath(table.identifier))
+    } else {
+      table.storage.locationUri
+    }
+
+    // Create the relation based on the data of df.
+    val pathOption = tableLocation.map("path" -> _)
+    val dataSource = DataSource(
+      sparkSession,
+      className = provider,
+      partitionColumns = table.partitionColumnNames,
+      bucketSpec = table.bucketSpec,
+      options = table.storage.properties ++ pathOption,
+      catalogTable = Some(table))
+
+    val result = try {
+      dataSource.write(mode, df)
+    } catch {
+      case ex: AnalysisException =>
+        logError(s"Failed to write to table $tableName in $mode mode", ex)
+        throw ex
+    }
+    result match {
+      case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+                                   sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+        // Need to recover partitions into the metastore so our saved data is visible.
+        sparkSession.sessionState.executePlan(
+          AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+      case _ =>
+    }
+
+    // Refresh the cache of the table in the catalog.
+    sessionState.catalog.refreshTable(tableIdentWithDB)
+    Seq.empty[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167b6c00/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
new file mode 100644
index 0000000..4e22d13
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -0,0 +1,122 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.net.URI
+
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
+import org.apache.spark.sql.sources.BaseRelation
+
+/**
+ * Create table 'using carbondata' and insert the query result into it.
+ *
+ * @param table the Catalog Table
+ * @param mode  SaveMode:Ignore,OverWrite,ErrorIfExists,Append
+ * @param query the query whose result will be insert into the new relation
+ *
+ */
+
+case class CreateCarbonSourceTableAsSelectCommand(
+    table: CatalogTable,
+    mode: SaveMode,
+    query: LogicalPlan)
+  extends RunnableCommand {
+
+  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(table.tableType != CatalogTableType.VIEW)
+    assert(table.provider.isDefined)
+
+    val sessionState = sparkSession.sessionState
+    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+    val tableIdentWithDB = table.identifier.copy(database = Some(db))
+    val tableName = tableIdentWithDB.unquotedString
+
+    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+      assert(mode != SaveMode.Overwrite,
+        s"Expect the table $tableName has been dropped when the save mode is Overwrite")
+
+      if (mode == SaveMode.ErrorIfExists) {
+        throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
+      }
+      if (mode == SaveMode.Ignore) {
+        // Since the table already exists and the save mode is Ignore, we will just return.
+        return Seq.empty
+      }
+
+      saveDataIntoTable(
+        sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
+    } else {
+      assert(table.schema.isEmpty)
+
+      val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+        Some(sessionState.catalog.defaultTablePath(table.identifier))
+      } else {
+        table.storage.locationUri
+      }
+      val result = saveDataIntoTable(
+        sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
+
+      result match {
+        case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+                                     sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+          // Need to recover partitions into the metastore so our saved data is visible.
+          sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+        case _ =>
+      }
+    }
+
+    Seq.empty[Row]
+  }
+
+  private def saveDataIntoTable(
+      session: SparkSession,
+      table: CatalogTable,
+      tableLocation: Option[URI],
+      data: LogicalPlan,
+      mode: SaveMode,
+      tableExists: Boolean): BaseRelation = {
+    // Create the relation based on the input logical plan: `data`.
+    val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
+    val dataSource = DataSource(
+      session,
+      className = table.provider.get,
+      partitionColumns = table.partitionColumnNames,
+      bucketSpec = table.bucketSpec,
+      options = table.storage.properties ++ pathOption,
+      catalogTable = if (tableExists) {
+        Some(table)
+      } else {
+        None
+      })
+
+    try {
+      dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
+    } catch {
+      case ex: AnalysisException =>
+        logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex)
+        throw ex
+    }
+  }
+}