You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/03 08:45:42 UTC
carbondata git commit: [CARBONDATA-2396] Add CTAS support for using
DataSource syntax
Repository: carbondata
Updated Branches:
refs/heads/master f7c0670cd -> 167b6c004
[CARBONDATA-2396] Add CTAS support for using DataSource syntax
Implemented CTAS feature to support DataSource syntax ("using carbondata")
This closes #2225
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/167b6c00
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/167b6c00
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/167b6c00
Branch: refs/heads/master
Commit: 167b6c00402963c9cea278f6246fdd21e3a1987d
Parents: f7c0670
Author: Indhumathi27 <in...@gmail.com>
Authored: Tue Apr 24 20:43:19 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Thu May 3 14:18:08 2018 +0530
----------------------------------------------------------------------
.../sql/commands/UsingCarbondataSuite.scala | 74 +++++++-
.../org/apache/spark/sql/CarbonSource.scala | 38 +++--
.../sql/execution/strategy/DDLStrategy.scala | 21 ++-
...CreateCarbonSourceTableAsSelectCommand.scala | 167 +++++++++++++++++++
...CreateCarbonSourceTableAsSelectCommand.scala | 122 ++++++++++++++
5 files changed, 406 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/167b6c00/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
index 37d65b4..097c9d9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.sql.commands
import scala.collection.mutable
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterEach
@@ -28,11 +28,15 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach {
override def beforeEach(): Unit = {
sql("DROP TABLE IF EXISTS src_carbondata1")
+ sql("DROP TABLE IF EXISTS src_carbondata3")
+ sql("DROP TABLE IF EXISTS src_carbondata4")
sql("DROP TABLE IF EXISTS tableSize3")
}
override def afterEach(): Unit = {
sql("DROP TABLE IF EXISTS src_carbondata1")
+ sql("DROP TABLE IF EXISTS src_carbondata3")
+ sql("DROP TABLE IF EXISTS src_carbondata4")
sql("DROP TABLE IF EXISTS tableSize3")
}
@@ -69,4 +73,72 @@ class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach {
res3.foreach(row => assert(row.getString(1).trim.toLong > 0))
}
+ test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata'") {
+ sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING carbondata")
+ sql("INSERT INTO src_carbondata3 VALUES(1,'source')")
+ checkAnswer(sql("SELECT * FROM src_carbondata3"), Row(1, "source"))
+ sql("CREATE TABLE src_carbondata4 USING carbondata as select * from src_carbondata3")
+ checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source"))
+ }
+
+ test("CARBONDATA-2396 Support Create Table As Select with 'USING org.apache.spark.sql.CarbonSource'") {
+ sql("DROP TABLE IF EXISTS src_carbondata3")
+ sql("DROP TABLE IF EXISTS src_carbondata4")
+ sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING org.apache.spark.sql.CarbonSource")
+ sql("INSERT INTO src_carbondata3 VALUES(1,'source')")
+ checkAnswer(sql("SELECT * FROM src_carbondata3"), Row(1, "source"))
+ sql("CREATE TABLE src_carbondata4 USING org.apache.spark.sql.CarbonSource as select * from src_carbondata3")
+ checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source"))
+ sql("DROP TABLE IF EXISTS src_carbondata3")
+ sql("DROP TABLE IF EXISTS src_carbondata4")
+ }
+
+ test("CARBONDATA-2396 Support Create Table As Select [IF NOT EXISTS] with 'using carbondata'") {
+ sql("DROP TABLE IF EXISTS src_carbondata5")
+ sql("DROP TABLE IF EXISTS src_carbondata6")
+ sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata")
+ sql("INSERT INTO src_carbondata5 VALUES(1,'source')")
+ checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source"))
+ sql(
+ "CREATE TABLE IF NOT EXISTS src_carbondata6 USING carbondata as select * from " +
+ "src_carbondata5")
+ checkAnswer(sql("SELECT * FROM src_carbondata6"), Row(1, "source"))
+ sql("DROP TABLE IF EXISTS src_carbondata5")
+ sql("DROP TABLE IF EXISTS src_carbondata6")
+ }
+
+ test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata' with Table properties") {
+ sql("DROP TABLE IF EXISTS src_carbondata5")
+ sql("DROP TABLE IF EXISTS src_carbondata6")
+ sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata")
+ sql("INSERT INTO src_carbondata5 VALUES(1,'source')")
+ checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source"))
+ sql("CREATE TABLE src_carbondata6 USING carbondata options('table_blocksize'='10'," +
+ "'sort_scope'='local_sort') as select * from src_carbondata5")
+ val result = sql("describe FORMATTED src_carbondata6")
+ checkExistence(result, true, "Table Block Size")
+ checkExistence(result, true, "10 MB")
+ checkAnswer(sql("SELECT * FROM src_carbondata6"), Row(1, "source"))
+ sql("DROP TABLE IF EXISTS src_carbondata5")
+ sql("DROP TABLE IF EXISTS src_carbondata6")
+ }
+
+ test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata' with Columns") {
+ sql("DROP TABLE IF EXISTS src_carbondata5")
+ sql("DROP TABLE IF EXISTS src_carbondata6")
+ sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata")
+ sql("INSERT INTO src_carbondata5 VALUES(1,'source')")
+ checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source"))
+ val exception = intercept[AnalysisException](
+ sql(
+ "CREATE TABLE src_carbondata6(name String) USING carbondata as select * from " +
+ "src_carbondata5"))
+ assert(exception.getMessage
+ .contains(
+ "Operation not allowed: Schema may not be specified in a Create Table As Select (CTAS) " +
+ "statement"))
+ sql("DROP TABLE IF EXISTS src_carbondata5")
+ sql("DROP TABLE IF EXISTS src_carbondata6")
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/167b6c00/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 3600854..8376136 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -26,6 +26,7 @@ import scala.language.implicitConversions
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
import org.apache.spark.sql.execution.streaming.Sink
@@ -87,12 +88,6 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
data: DataFrame): BaseRelation = {
CarbonEnv.getInstance(sqlContext.sparkSession)
var newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
- // User should not specify path since only one store is supported in carbon currently,
- // after we support multi-store, we can remove this limitation
- require(!newParameters.contains("path"), "'path' should not be specified, " +
- "the path to store carbon file is the 'storePath' " +
- "specified when creating CarbonContext")
-
val options = new CarbonOption(newParameters)
val isExists = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.tableExists(
options.tableName, options.dbName)(sqlContext.sparkSession)
@@ -181,7 +176,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
dbName,
tableName)
val updatedParams = CarbonSource.updateAndCreateTable(
- identifier, dataSchema, sparkSession, metaStore, parameters)
+ identifier, dataSchema, sparkSession, metaStore, parameters, None)
(CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), updatedParams)
case ex: Exception =>
throw new Exception("do not have dbname and tablename for carbon table", ex)
@@ -273,12 +268,26 @@ object CarbonSource {
def createTableInfoFromParams(
parameters: Map[String, String],
dataSchema: StructType,
- identifier: AbsoluteTableIdentifier): TableModel = {
+ identifier: AbsoluteTableIdentifier,
+ query: Option[LogicalPlan],
+ sparkSession: SparkSession): TableModel = {
val sqlParser = new CarbonSpark2SqlParser
- val fields = sqlParser.getFields(dataSchema)
val map = scala.collection.mutable.Map[String, String]()
parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
val options = new CarbonOption(parameters)
+ val fields = query match {
+ case Some(q) =>
+ // if query is provided then it is a CTAS flow
+ if (sqlParser.getFields(dataSchema).nonEmpty) {
+ throw new AnalysisException(
+ "Schema cannot be specified in a Create Table As Select (CTAS) statement")
+ }
+ sqlParser
+ .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .getSchemaFromUnresolvedRelation(sparkSession, q))
+ case None =>
+ sqlParser.getFields(dataSchema)
+ }
val bucketFields = sqlParser.getBucketFields(map, fields, options)
sqlParser.prepareTableModel(ifNotExistPresent = false, Option(identifier.getDatabaseName),
identifier.getTableName, fields, Nil, map, bucketFields)
@@ -292,7 +301,8 @@ object CarbonSource {
*/
def updateCatalogTableWithCarbonSchema(
tableDesc: CatalogTable,
- sparkSession: SparkSession): CatalogTable = {
+ sparkSession: SparkSession,
+ query: Option[LogicalPlan] = None): CatalogTable = {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val storageFormat = tableDesc.storage
val properties = storageFormat.properties
@@ -306,7 +316,8 @@ object CarbonSource {
tableDesc.schema,
sparkSession,
metaStore,
- properties)
+ properties,
+ query)
// updating params
val updatedFormat = storageFormat.copy(properties = map)
tableDesc.copy(storage = updatedFormat)
@@ -334,8 +345,9 @@ object CarbonSource {
dataSchema: StructType,
sparkSession: SparkSession,
metaStore: CarbonMetaStore,
- properties: Map[String, String]): Map[String, String] = {
- val model = createTableInfoFromParams(properties, dataSchema, identifier)
+ properties: Map[String, String],
+ query: Option[LogicalPlan]): Map[String, String] = {
+ val model = createTableInfoFromParams(properties, dataSchema, identifier, query, sparkSession)
val tableInfo: TableInfo = TableNewProcessor(model)
val isExternal = properties.getOrElse("isExternal", "false")
val isTransactionalTable = properties.getOrElse("isTransactional", "true")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/167b6c00/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index d7e2023..ef4d05c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedComm
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
@@ -228,11 +228,28 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
val cmd =
CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil
+ case cmd@CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
+ if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
+ && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
+ || tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>
+ val updatedCatalog = CarbonSource
+ .updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, Option(query))
+ val cmd = CreateCarbonSourceTableAsSelectCommand(updatedCatalog, SaveMode.Ignore, query)
+ ExecutedCommandExec(cmd) :: Nil
+ case cmd@org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, query)
+ if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
+ && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource")
+ || tableDesc.provider.get.equalsIgnoreCase("carbondata")) =>
+ val updatedCatalog = CarbonSource
+ .updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, query)
+ val cmd = CreateCarbonSourceTableAsSelectCommand(updatedCatalog, SaveMode.Ignore, query.get)
+ ExecutedCommandExec(cmd) :: Nil
case CreateDataSourceTableCommand(table, ignoreIfExists)
if table.provider.get != DDLUtils.HIVE_PROVIDER
&& (table.provider.get.equals("org.apache.spark.sql.CarbonSource")
|| table.provider.get.equalsIgnoreCase("carbondata")) =>
- val updatedCatalog = CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession)
+ val updatedCatalog = CarbonSource
+ .updateCatalogTableWithCarbonSchema(table, sparkSession)
val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists)
ExecutedCommandExec(cmd) :: Nil
case AlterTableSetPropertiesCommand(tableName, properties, isView)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/167b6c00/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
new file mode 100644
index 0000000..989b1d5
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -0,0 +1,167 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{
+ CatalogRelation, CatalogTable, CatalogTableType,
+ SimpleCatalogRelation
+}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{
+ AlterTableRecoverPartitionsCommand, DDLUtils,
+ RunnableCommand
+}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Create table 'using carbondata' and insert the query result into it.
+ *
+ * @param table the Catalog Table
+ * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append
+ * @param query the query whose result will be insert into the new relation
+ *
+ */
+
+case class CreateCarbonSourceTableAsSelectCommand(
+ table: CatalogTable,
+ mode: SaveMode,
+ query: LogicalPlan)
+ extends RunnableCommand {
+
+ override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ assert(table.tableType != CatalogTableType.VIEW)
+ assert(table.provider.isDefined)
+ assert(table.schema.isEmpty)
+
+ val provider = table.provider.get
+ val sessionState = sparkSession.sessionState
+ val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+ val tableIdentWithDB = table.identifier.copy(database = Some(db))
+ val tableName = tableIdentWithDB.unquotedString
+
+ var createMetastoreTable = false
+ var existingSchema = Option.empty[StructType]
+ if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
+ // Check if we need to throw an exception or just return.
+ mode match {
+ case SaveMode.ErrorIfExists =>
+ throw new AnalysisException(s"Table $tableName already exists. " +
+ s"If you are using saveAsTable, you can set SaveMode to " +
+ s"SaveMode.Append to " +
+ s"insert data into the table or set SaveMode to SaveMode" +
+ s".Overwrite to overwrite" +
+ s"the existing data. " +
+ s"Or, if you are using SQL CREATE TABLE, you need to drop " +
+ s"$tableName first.")
+ case SaveMode.Ignore =>
+ // Since the table already exists and the save mode is Ignore, we will just return.
+ return Seq.empty[Row]
+ case SaveMode.Append =>
+ // Check if the specified data source match the data source of the existing table.
+ val existingProvider = DataSource.lookupDataSource(provider)
+ // TODO: Check that options from the resolved relation match the relation that we are
+ // inserting into (i.e. using the same compression).
+
+ // Pass a table identifier with database part, so that `lookupRelation` won't get temp
+ // views unexpectedly.
+ EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
+ case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
+ // check if the file formats match
+ l.relation match {
+ case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
+ throw new AnalysisException(
+ s"The file format of the existing table $tableName is " +
+ s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " +
+ s"format `$provider`")
+ case _ =>
+ }
+ if (query.schema.size != l.schema.size) {
+ throw new AnalysisException(
+ s"The column number of the existing schema[${ l.schema }] " +
+ s"doesn't match the data schema[${ query.schema }]'s")
+ }
+ existingSchema = Some(l.schema)
+ case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
+ existingSchema = Some(s.metadata.schema)
+ case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) =>
+ throw new AnalysisException("Saving data in the Hive serde table " +
+ s"${ c.catalogTable.identifier } is not supported yet. " +
+ s"Please use the insertInto() API as an alternative..")
+ case o =>
+ throw new AnalysisException(s"Saving data in ${ o.toString } is not supported.")
+ }
+ case SaveMode.Overwrite =>
+ sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
+ // Need to create the table again.
+ createMetastoreTable = true
+ }
+ } else {
+ // The table does not exist. We need to create it in metastore.
+ createMetastoreTable = true
+ }
+
+ val data = Dataset.ofRows(sparkSession, query)
+ val df = existingSchema match {
+ // If we are inserting into an existing table, just use the existing schema.
+ case Some(s) => data.selectExpr(s.fieldNames: _*)
+ case None => data
+ }
+
+ val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+ Some(sessionState.catalog.defaultTablePath(table.identifier))
+ } else {
+ table.storage.locationUri
+ }
+
+ // Create the relation based on the data of df.
+ val pathOption = tableLocation.map("path" -> _)
+ val dataSource = DataSource(
+ sparkSession,
+ className = provider,
+ partitionColumns = table.partitionColumnNames,
+ bucketSpec = table.bucketSpec,
+ options = table.storage.properties ++ pathOption,
+ catalogTable = Some(table))
+
+ val result = try {
+ dataSource.write(mode, df)
+ } catch {
+ case ex: AnalysisException =>
+ logError(s"Failed to write to table $tableName in $mode mode", ex)
+ throw ex
+ }
+ result match {
+ case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+ sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+ // Need to recover partitions into the metastore so our saved data is visible.
+ sparkSession.sessionState.executePlan(
+ AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+ case _ =>
+ }
+
+ // Refresh the cache of the table in the catalog.
+ sessionState.catalog.refreshTable(tableIdentWithDB)
+ Seq.empty[Row]
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/167b6c00/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
new file mode 100644
index 0000000..4e22d13
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala
@@ -0,0 +1,122 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.net.URI
+
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
+import org.apache.spark.sql.sources.BaseRelation
+
+/**
+ * Create table 'using carbondata' and insert the query result into it.
+ *
+ * @param table the Catalog Table
+ * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append
+ * @param query the query whose result will be insert into the new relation
+ *
+ */
+
+case class CreateCarbonSourceTableAsSelectCommand(
+ table: CatalogTable,
+ mode: SaveMode,
+ query: LogicalPlan)
+ extends RunnableCommand {
+
+ override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ assert(table.tableType != CatalogTableType.VIEW)
+ assert(table.provider.isDefined)
+
+ val sessionState = sparkSession.sessionState
+ val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+ val tableIdentWithDB = table.identifier.copy(database = Some(db))
+ val tableName = tableIdentWithDB.unquotedString
+
+ if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+ assert(mode != SaveMode.Overwrite,
+ s"Expect the table $tableName has been dropped when the save mode is Overwrite")
+
+ if (mode == SaveMode.ErrorIfExists) {
+ throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
+ }
+ if (mode == SaveMode.Ignore) {
+ // Since the table already exists and the save mode is Ignore, we will just return.
+ return Seq.empty
+ }
+
+ saveDataIntoTable(
+ sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
+ } else {
+ assert(table.schema.isEmpty)
+
+ val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+ Some(sessionState.catalog.defaultTablePath(table.identifier))
+ } else {
+ table.storage.locationUri
+ }
+ val result = saveDataIntoTable(
+ sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
+
+ result match {
+ case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+ sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+ // Need to recover partitions into the metastore so our saved data is visible.
+ sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+ case _ =>
+ }
+ }
+
+ Seq.empty[Row]
+ }
+
+ private def saveDataIntoTable(
+ session: SparkSession,
+ table: CatalogTable,
+ tableLocation: Option[URI],
+ data: LogicalPlan,
+ mode: SaveMode,
+ tableExists: Boolean): BaseRelation = {
+ // Create the relation based on the input logical plan: `data`.
+ val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
+ val dataSource = DataSource(
+ session,
+ className = table.provider.get,
+ partitionColumns = table.partitionColumnNames,
+ bucketSpec = table.bucketSpec,
+ options = table.storage.properties ++ pathOption,
+ catalogTable = if (tableExists) {
+ Some(table)
+ } else {
+ None
+ })
+
+ try {
+ dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
+ } catch {
+ case ex: AnalysisException =>
+ logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex)
+ throw ex
+ }
+ }
+}