You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:09 UTC
[13/50] [abbrv] carbondata git commit: [CARBONDATA-1884] Add CTAS
support to carbondata
[CARBONDATA-1884] Add CTAS support to carbondata
Implemented CTAS feature in carbondata
This closes #1665
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/59de7cdb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/59de7cdb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/59de7cdb
Branch: refs/heads/fgdatamap
Commit: 59de7cdbd234d921af1a4f535d240943778a775d
Parents: 54eedfe
Author: manishgupta88 <to...@gmail.com>
Authored: Wed Dec 13 21:40:19 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Dec 19 14:43:57 2017 +0800
----------------------------------------------------------------------
.../createTable/TestCreateTableAsSelect.scala | 161 +++++++++++++++++++
.../TestDataWithDicExcludeAndInclude.scala | 11 --
.../spark/util/CarbonReflectionUtils.scala | 2 +-
.../CarbonCreateTableAsSelectCommand.scala | 113 +++++++++++++
.../spark/sql/hive/CarbonFileMetastore.scala | 44 +++++
.../apache/spark/sql/hive/CarbonMetaStore.scala | 28 +++-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 56 +++++--
.../src/main/spark2.1/CarbonSessionState.scala | 26 +--
.../src/main/spark2.2/CarbonSessionState.scala | 26 +--
9 files changed, 417 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
new file mode 100644
index 0000000..ffe6261
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.test.Spark2TestQueryExecutor
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
+/**
+ * test functionality for create table as select command
+ */
+class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
+
+ private def createTablesAndInsertData {
+ // create carbon table and insert data
+ sql("CREATE TABLE carbon_ctas_test(key INT, value STRING) STORED by 'carbondata'")
+ sql("insert into carbon_ctas_test select 100,'spark'")
+ sql("insert into carbon_ctas_test select 200,'hive'")
+
+ // create parquet table and insert data
+ sql("CREATE TABLE parquet_ctas_test(key INT, value STRING) STORED as parquet")
+ sql("insert into parquet_ctas_test select 100,'spark'")
+ sql("insert into parquet_ctas_test select 200,'hive'")
+
+ // create hive table and insert data
+ sql("CREATE TABLE orc_ctas_test(key INT, value STRING) STORED as ORC")
+ sql("insert into orc_ctas_test select 100,'spark'")
+ sql("insert into orc_ctas_test select 200,'hive'")
+ }
+
+ override def beforeAll {
+ sql("DROP TABLE IF EXISTS carbon_ctas_test")
+ sql("DROP TABLE IF EXISTS parquet_ctas_test")
+ sql("DROP TABLE IF EXISTS orc_ctas_test")
+ createTablesAndInsertData
+ }
+
+ test("test create table as select with select from same table name when table exists") {
+ sql("drop table if exists ctas_same_table_name")
+ sql("CREATE TABLE ctas_same_table_name(key INT, value STRING) STORED by 'carbondata'")
+ intercept[Exception] {
+ sql("create table ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name")
+ }
+ }
+
+ test("test create table as select with select from same table name when table does not exists") {
+ sql("drop table if exists ctas_same_table_name")
+ intercept[Exception] {
+ sql("create table ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name")
+ }
+ }
+
+ test("test create table as select with select from same table name with if not exists clause") {
+ sql("drop table if exists ctas_same_table_name")
+ sql("CREATE TABLE ctas_same_table_name(key INT, value STRING) STORED by 'carbondata'")
+ sql("create table if not exists ctas_same_table_name stored by 'carbondata' as select * from ctas_same_table_name")
+ assert(true)
+ }
+
+ test("test create table as select with select from another carbon table") {
+ sql("DROP TABLE IF EXISTS ctas_select_carbon")
+ sql("create table ctas_select_carbon stored by 'carbondata' as select * from carbon_ctas_test")
+ checkAnswer(sql("select * from ctas_select_carbon"), sql("select * from carbon_ctas_test"))
+ }
+
+ test("test create table as select with select from another parquet table") {
+ sql("DROP TABLE IF EXISTS ctas_select_parquet")
+ sql("create table ctas_select_parquet stored by 'carbondata' as select * from parquet_ctas_test")
+ checkAnswer(sql("select * from ctas_select_parquet"), sql("select * from parquet_ctas_test"))
+ }
+
+ test("test create table as select with select from another hive/orc table") {
+ sql("DROP TABLE IF EXISTS ctas_select_orc")
+ sql("create table ctas_select_orc stored by 'carbondata' as select * from orc_ctas_test")
+ checkAnswer(sql("select * from ctas_select_orc"), sql("select * from orc_ctas_test"))
+ }
+
+ test("test create table as select with where clause in select from carbon table that returns data") {
+ sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
+ sql("create table ctas_select_where_carbon stored by 'carbondata' as select * from carbon_ctas_test where key=100")
+ checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test where key=100"))
+ }
+
+ test(
+ "test create table as select with where clause in select from carbon table that does not return data") {
+ sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
+ sql("create table ctas_select_where_carbon stored by 'carbondata' as select * from carbon_ctas_test where key=300")
+ checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test where key=300"))
+ }
+
+ test("test create table as select with where clause in select from carbon table and load again") {
+ sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
+ sql("create table ctas_select_where_carbon stored by 'carbondata' as select * from carbon_ctas_test where key=100")
+ sql("insert into ctas_select_where_carbon select 200,'hive'")
+ checkAnswer(sql("select * from ctas_select_where_carbon"), sql("select * from carbon_ctas_test"))
+ }
+
+ test("test create table as select with where clause in select from parquet table") {
+ sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
+ sql("create table ctas_select_where_parquet stored by 'carbondata' as select * from parquet_ctas_test where key=100")
+ checkAnswer(sql("select * from ctas_select_where_parquet"), sql("select * from parquet_ctas_test where key=100"))
+ }
+
+ test("test create table as select with where clause in select from hive/orc table") {
+ sql("DROP TABLE IF EXISTS ctas_select_where_orc")
+ sql("create table ctas_select_where_orc stored by 'carbondata' as select * from orc_ctas_test where key=100")
+ checkAnswer(sql("select * from ctas_select_where_orc"), sql("select * from orc_ctas_test where key=100"))
+ }
+
+ test("test create table as select with select directly having the data") {
+ sql("DROP TABLE IF EXISTS ctas_select_direct_data")
+ sql("create table ctas_select_direct_data stored by 'carbondata' as select 300,'carbondata'")
+ checkAnswer(sql("select * from ctas_select_direct_data"), Seq(Row(300,"carbondata")))
+ }
+
+ test("test create table as select with TBLPROPERTIES") {
+ sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+ sql(
+ "create table ctas_tblproperties_test stored by 'carbondata' TBLPROPERTIES" +
+ "('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test")
+ checkAnswer(sql("select * from ctas_tblproperties_test"), sql("select * from carbon_ctas_test"))
+ val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore
+ .lookupRelation(Option("default"), "ctas_tblproperties_test")(Spark2TestQueryExecutor.spark)
+ .asInstanceOf[CarbonRelation].carbonTable
+ val metadataFolderPath: CarbonFile = FileFactory.getCarbonFile(carbonTable.getMetaDataFilepath)
+ assert(metadataFolderPath.exists())
+ val dictFiles: Array[CarbonFile] = metadataFolderPath.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.contains(".dict") || file.getName.contains(".sortindex")
+ }
+ })
+ assert(dictFiles.length == 3)
+ }
+
+ override def afterAll {
+ sql("DROP TABLE IF EXISTS carbon_ctas_test")
+ sql("DROP TABLE IF EXISTS parquet_ctas_test")
+ sql("DROP TABLE IF EXISTS orc_ctas_test")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
index 484c304..c788857 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
@@ -100,17 +100,6 @@ class TestLoadDataWithDictionaryExcludeAndInclude extends QueryTest with BeforeA
).message.contains("Operation not allowed: CREATE EXTERNAL TABLE"))
}
- test("test CTAS should fail") {
- assert(intercept[AnalysisException](
- sql(
- """
- | CREATE TABLE t1 (id string, value int)
- | STORED BY 'carbondata'
- | AS SELECT 'ABC', 1 FROM t2
- """.stripMargin)
- ).message.contains("Operation not allowed: CREATE TABLE AS SELECT"))
- }
-
override def afterAll {
dropTable
CarbonProperties.getInstance()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index ba51077..19b967a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -189,7 +189,7 @@ object CarbonReflectionUtils {
createObject(
"org.apache.spark.sql.hive.CarbonSqlAstBuilder",
conf,
- sqlParser)._1.asInstanceOf[AstBuilder]
+ sqlParser, sparkSession)._1.asInstanceOf[AstBuilder]
} else {
throw new UnsupportedOperationException("Spark version not supported")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
new file mode 100644
index 0000000..26a8f6f
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.table
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.AtomicRunnableCommand
+import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableInfo the Table Describe, which may contains serde, storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param ifNotExistsSet allow continue working if it's already exists, otherwise
+ * raise exception
+ * @param tableLocation store location where the table need to be created
+ */
+case class CarbonCreateTableAsSelectCommand(
+ tableInfo: TableInfo,
+ query: LogicalPlan,
+ ifNotExistsSet: Boolean = false,
+ tableLocation: Option[String] = None) extends AtomicRunnableCommand {
+
+ /**
+ * variable to be used for insert into command for checking whether the
+ * table is created newly or already existed
+ */
+ var isTableCreated: Boolean = false
+
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val tableName = tableInfo.getFactTable.getTableName
+ var databaseOpt: Option[String] = None
+ if (tableInfo.getDatabaseName != null) {
+ databaseOpt = Some(tableInfo.getDatabaseName)
+ }
+ val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+ LOGGER.audit(s"Request received for CTAS for $dbName.$tableName")
+ // check if table already exists
+ if (sparkSession.sessionState.catalog.listTables(dbName)
+ .exists(_.table.equalsIgnoreCase(tableName))) {
+ if (!ifNotExistsSet) {
+ LOGGER.audit(
+ s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
+ s"Table [$tableName] already exists under database [$dbName]")
+ throw new TableAlreadyExistsException(dbName, tableName)
+ }
+ } else {
+ // execute command to create carbon table
+ CarbonCreateTableCommand(tableInfo, ifNotExistsSet, tableLocation).run(sparkSession)
+ isTableCreated = true
+ }
+ Seq.empty
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ if (isTableCreated) {
+ val tableName = tableInfo.getFactTable.getTableName
+ var databaseOpt: Option[String] = None
+ if (tableInfo.getDatabaseName != null) {
+ databaseOpt = Some(tableInfo.getDatabaseName)
+ }
+ val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val carbonDataSourceHadoopRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .createCarbonDataSourceHadoopRelation(sparkSession,
+ TableIdentifier(tableName, Option(dbName)))
+ // execute command to load data into carbon table
+ CarbonInsertIntoCommand(
+ carbonDataSourceHadoopRelation,
+ query,
+ overwrite = false,
+ partition = Map.empty).run(sparkSession)
+ LOGGER.audit(s"CTAS operation completed successfully for $dbName.$tableName")
+ }
+ Seq.empty
+ }
+
+ override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
+ val tableName = tableInfo.getFactTable.getTableName
+ var databaseOpt: Option[String] = None
+ if (tableInfo.getDatabaseName != null) {
+ databaseOpt = Some(tableInfo.getDatabaseName)
+ }
+ val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+ // drop the created table.
+ CarbonDropTableCommand(
+ ifExistsSet = false,
+ Option(dbName), tableName).run(sparkSession)
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index f7a1eed..ba222e2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive
+import java.net.URI
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
@@ -528,4 +529,47 @@ class CarbonFileMetastore extends CarbonMetaStore {
val tableMetadataFile = tablePath.getSchemaFilePath
CarbonUtil.readSchemaFile(tableMetadataFile)
}
+
+ override def createCarbonDataSourceHadoopRelation(
+ sparkSession: SparkSession,
+ tableIdentifier: TableIdentifier): CarbonDatasourceHadoopRelation = {
+ val relation: LogicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier)
+ relation match {
+ case SubqueryAlias(_,
+ LogicalRelation(carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
+ carbonDataSourceHadoopRelation
+ case LogicalRelation(
+ carbonDataSourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+ carbonDataSourceHadoopRelation
+ case SubqueryAlias(_, c)
+ if SPARK_VERSION.startsWith("2.2") &&
+ (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+ c.getClass.getName
+ .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+ c.getClass.getName.equals(
+ "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
+ val catalogTable =
+ CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable]
+ catalogTable.provider match {
+ case Some(name) if name.equals("org.apache.spark.sql.CarbonSource") => name
+ case _ =>
+ throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table)
+ }
+ val tableLocation = catalogTable.storage.locationUri match {
+ case tableLoc@Some(uri) =>
+ if (tableLoc.get.isInstanceOf[String]) {
+ FileFactory.getUpdatedFilePath(tableLoc.get.asInstanceOf[String])
+ } else if (tableLoc.get.isInstanceOf[URI]) {
+ FileFactory.getUpdatedFilePath(tableLoc.get.asInstanceOf[URI].getPath)
+ }
+ case None =>
+ CarbonEnv.getTablePath(tableIdentifier.database, tableIdentifier.table)(sparkSession)
+ }
+ CarbonDatasourceHadoopRelation(sparkSession,
+ Array(tableLocation.asInstanceOf[String]),
+ catalogTable.storage.properties,
+ Option(catalogTable.schema))
+ case _ => throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 5a8e58b..cc0e6ab 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -16,9 +16,10 @@
*/
package org.apache.spark.sql.hive
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, DataFrame, Dataset, RuntimeConfig, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.StructType
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
@@ -144,6 +145,31 @@ trait CarbonMetaStore {
def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
+
+ /**
+ * Method will be used to retrieve or create carbon data source relation
+ *
+ * @param sparkSession
+ * @param tableIdentifier
+ * @return
+ */
+ def createCarbonDataSourceHadoopRelation(
+ sparkSession: SparkSession,
+ tableIdentifier: TableIdentifier): CarbonDatasourceHadoopRelation
+
+ /**
+ * Method will be used retrieve the schema from unresolved relation
+ *
+ * @param sparkSession
+ * @param query
+ * @return
+ */
+ def getSchemaFromUnresolvedRelation(
+ sparkSession: SparkSession,
+ query: LogicalPlan): StructType = {
+ val df: DataFrame = Dataset.ofRows(sparkSession, query)
+ df.schema
+ }
}
/**
* Factory for Carbon metastore
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 79095ca..3597208 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -26,14 +26,12 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, TableNewProcessor}
-import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
@@ -78,7 +76,9 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
}
}
-class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
+class CarbonHelperSqlAstBuilder(conf: SQLConf,
+ parser: CarbonSpark2SqlParser,
+ sparkSession: SparkSession)
extends SparkSqlAstBuilder(conf) {
def getFileStorage(createFileFormat: CreateFileFormatContext): String = {
@@ -147,7 +147,8 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
tablePropertyList : TablePropertyListContext,
locationSpecContext: SqlBaseParser.LocationSpecContext,
tableComment : Option[String],
- ctas: TerminalNode) : LogicalPlan = {
+ ctas: TerminalNode,
+ query: QueryContext) : LogicalPlan = {
// val parser = new CarbonSpark2SqlParser
val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
@@ -166,9 +167,6 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE", tableHeader)
}
- if (ctas != null && columns != null) {
- operationNotAllowed("CREATE TABLE AS SELECT", tableHeader)
- }
val cols = Option(columns).toSeq.flatMap(visitColTypeList)
val properties = getPropertyKeyValues(tablePropertyList)
@@ -210,13 +208,33 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
}
}
- val fields = parser.getFields(cols ++ partitionByStructFields)
val options = new CarbonOption(properties)
+ // validate streaming property
+ validateStreamingProperty(options)
+ var fields = parser.getFields(cols ++ partitionByStructFields)
+ // validate for create table as select
+ val selectQuery = Option(query).map(plan)
+ selectQuery match {
+ case Some(q) =>
+ // create table as select does not allow creation of partitioned table
+ if (partitionFields.nonEmpty) {
+ val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
+ "create a partitioned table using Carbondata file formats."
+ operationNotAllowed(errorMessage, partitionColumns)
+ }
+ // create table as select does not allow to explicitly specify schema
+ if (fields.nonEmpty) {
+ operationNotAllowed(
+ "Schema may not be specified in a Create Table As Select (CTAS) statement", columns)
+ }
+ fields = parser
+ .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
+ case _ =>
+ // ignore this case
+ }
// validate tblProperties
val bucketFields = parser.getBucketFields(tableProperties, fields, options)
-
- validateStreamingProperty(options)
-
// prepare table model of the collected tokens
val tableModel: TableModel = parser.prepareTableModel(
ifNotExists,
@@ -228,7 +246,19 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
bucketFields,
isAlterFlow = false,
tableComment)
- CarbonCreateTableCommand(TableNewProcessor(tableModel), tableModel.ifNotExistsSet, tablePath)
+
+ selectQuery match {
+ case query@Some(q) =>
+ CarbonCreateTableAsSelectCommand(
+ TableNewProcessor(tableModel),
+ query.get,
+ tableModel.ifNotExistsSet,
+ tablePath)
+ case _ =>
+ CarbonCreateTableCommand(TableNewProcessor(tableModel),
+ tableModel.ifNotExistsSet,
+ tablePath)
+ }
}
private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index c198613..b55a6aa 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -259,25 +259,27 @@ object CarbonOptimizerUtil {
}
}
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
- SparkSqlAstBuilder(conf) {
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+ extends SparkSqlAstBuilder(conf) {
- val helper = new CarbonHelperSqlAstBuilder(conf, parser)
+ val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
val fileStorage = helper.getFileStorage(ctx.createFileFormat)
if (fileStorage.equalsIgnoreCase("'carbondata'") ||
fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- helper.createCarbonTable(ctx.createTableHeader,
- ctx.skewSpec,
- ctx.bucketSpec,
- ctx.partitionColumns,
- ctx.columns,
- ctx.tablePropertyList,
- ctx.locationSpec,
- Option(ctx.STRING()).map(string),
- ctx.AS)
+ helper.createCarbonTable(
+ tableHeader = ctx.createTableHeader,
+ skewSpecContext = ctx.skewSpec,
+ bucketSpecContext = ctx.bucketSpec,
+ partitionColumns = ctx.partitionColumns,
+ columns = ctx.columns,
+ tablePropertyList = ctx.tablePropertyList,
+ locationSpecContext = ctx.locationSpec(),
+ tableComment = Option(ctx.STRING()).map(string),
+ ctas = ctx.AS,
+ query = ctx.query)
} else {
super.visitCreateTable(ctx)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59de7cdb/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index 8acddfa..c951e5e 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -280,25 +280,27 @@ class CarbonOptimizer(
}
}
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
- SparkSqlAstBuilder(conf) {
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+ extends SparkSqlAstBuilder(conf) {
- val helper = new CarbonHelperSqlAstBuilder(conf, parser)
+ val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
val fileStorage = helper.getFileStorage(ctx.createFileFormat)
if (fileStorage.equalsIgnoreCase("'carbondata'") ||
fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- helper.createCarbonTable(ctx.createTableHeader,
- ctx.skewSpec,
- ctx.bucketSpec,
- ctx.partitionColumns,
- ctx.columns,
- ctx.tablePropertyList,
- ctx.locationSpec(),
- Option(ctx.STRING()).map(string),
- ctx.AS)
+ helper.createCarbonTable(
+ tableHeader = ctx.createTableHeader,
+ skewSpecContext = ctx.skewSpec,
+ bucketSpecContext = ctx.bucketSpec,
+ partitionColumns = ctx.partitionColumns,
+ columns = ctx.columns,
+ tablePropertyList = ctx.tablePropertyList,
+ locationSpecContext = ctx.locationSpec(),
+ tableComment = Option(ctx.STRING()).map(string),
+ ctas = ctx.AS,
+ query = ctx.query)
} else {
super.visitCreateHiveTable(ctx)
}