You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 03:28:25 UTC
[05/49] carbondata git commit: [CARBONDATA-1968] Add external table
support
[CARBONDATA-1968] Add external table support
This PR adds support for creating external table with existing carbondata files, using Hive syntax.
CREATE EXTERNAL TABLE tableName STORED BY 'carbondata' LOCATION 'path'
This closes #1749
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c2095089
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c2095089
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c2095089
Branch: refs/heads/carbonstore-rebase4
Commit: c2095089c5a6e203f44158248178fa11ce627925
Parents: bf3602f
Author: Jacky Li <ja...@qq.com>
Authored: Tue Jan 2 23:46:14 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Feb 26 23:55:29 2018 +0800
----------------------------------------------------------------------
.../core/metadata/schema/table/CarbonTable.java | 9 ++
.../createTable/TestCreateExternalTable.scala | 91 ++++++++++++++++++++
.../TestDataWithDicExcludeAndInclude.scala | 10 ---
.../command/table/CarbonDropTableCommand.scala | 5 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 64 +++++++++-----
5 files changed, 147 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2095089/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 09ff440..6036569 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -785,6 +785,15 @@ public class CarbonTable implements Serializable {
&& !tableInfo.getParentRelationIdentifiers().isEmpty();
}
+ /**
+ * Return true if this is an external table (table with property "_external"="true", this is
+ * an internal table property set during table creation)
+ */
+ public boolean isExternalTable() {
+ String external = tableInfo.getFactTable().getTableProperties().get("_external");
+ return external != null && external.equalsIgnoreCase("true");
+ }
+
public long size() throws IOException {
Map<String, Long> dataIndexSize = CarbonUtil.calculateDataIndexSize(this);
Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2095089/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
new file mode 100644
index 0000000..67370eb
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll {
+
+ var originDataPath: String = _
+
+ override def beforeAll(): Unit = {
+ sql("DROP TABLE IF EXISTS origin")
+ // create carbon table and insert data
+ sql("CREATE TABLE origin(key INT, value STRING) STORED BY 'carbondata'")
+ sql("INSERT INTO origin select 100,'spark'")
+ sql("INSERT INTO origin select 200,'hive'")
+ originDataPath = s"$storeLocation/origin"
+ }
+
+ override def afterAll(): Unit = {
+ sql("DROP TABLE IF EXISTS origin")
+ }
+
+ test("create external table with existing files") {
+ assert(new File(originDataPath).exists())
+ sql("DROP TABLE IF EXISTS source")
+
+ // create external table with existing files
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE source
+ |STORED BY 'carbondata'
+ |LOCATION '$storeLocation/origin'
+ """.stripMargin)
+ checkAnswer(sql("SELECT count(*) from source"), sql("SELECT count(*) from origin"))
+
+ val carbonTable = CarbonEnv.getCarbonTable(None, "source")(sqlContext.sparkSession)
+ assert(carbonTable.isExternalTable)
+
+ sql("DROP TABLE IF EXISTS source")
+
+ // DROP TABLE should not delete data
+ assert(new File(originDataPath).exists())
+ }
+
+ test("create external table with empty folder") {
+ val exception = intercept[AnalysisException] {
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE source
+ |STORED BY 'carbondata'
+ |LOCATION './nothing'
+ """.stripMargin)
+ }
+ assert(exception.getMessage().contains("Invalid table path provided"))
+ }
+
+ test("create external table with CTAS") {
+ val exception = intercept[AnalysisException] {
+ sql(
+ """
+ |CREATE EXTERNAL TABLE source
+ |STORED BY 'carbondata'
+ |LOCATION './nothing'
+ |AS
+ | SELECT * FROM origin
+ """.stripMargin)
+ }
+ assert(exception.getMessage().contains("Create external table as select"))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2095089/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
index c788857..201da39 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
@@ -90,16 +90,6 @@ class TestLoadDataWithDictionaryExcludeAndInclude extends QueryTest with BeforeA
)
}
- test("test create external table should fail") {
- assert(intercept[AnalysisException](
- sql(
- """
- | CREATE EXTERNAL TABLE t1 (id string, value int)
- | STORED BY 'carbondata'
- """.stripMargin)
- ).message.contains("Operation not allowed: CREATE EXTERNAL TABLE"))
- }
-
override def afterAll {
dropTable
CarbonProperties.getInstance()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2095089/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 7c895ab..8001a93 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -147,7 +147,10 @@ case class CarbonDropTableCommand(
// delete the table folder
val tablePath = carbonTable.getTablePath
val fileType = FileFactory.getFileType(tablePath)
- if (FileFactory.isFileExist(tablePath, fileType)) {
+
+ // delete table data only if it is not external table
+ if (FileFactory.isFileExist(tablePath, fileType) &&
+ !carbonTable.isExternalTable) {
val file = FileFactory.getCarbonFile(tablePath, fileType)
CarbonUtil.deleteFoldersAndFilesSilent(file)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2095089/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 4b77417..ad6d0c7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -32,6 +32,8 @@ import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.hadoop.util.SchemaReader
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
@@ -164,9 +166,6 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
if (bucketSpecContext != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
}
- if (external) {
- operationNotAllowed("CREATE EXTERNAL TABLE", tableHeader)
- }
val cols = Option(columns).toSeq.flatMap(visitColTypeList)
val properties = getPropertyKeyValues(tablePropertyList)
@@ -231,6 +230,10 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
operationNotAllowed(
"Schema may not be specified in a Create Table As Select (CTAS) statement", columns)
}
+ // external table is not allow
+ if (external) {
+ operationNotAllowed("Create external table as select", tableHeader)
+ }
fields = parser
.getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
.getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
@@ -242,29 +245,48 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
}
// validate tblProperties
val bucketFields = parser.getBucketFields(tableProperties, fields, options)
- // prepare table model of the collected tokens
- val tableModel: TableModel = parser.prepareTableModel(
- ifNotExists,
- convertDbNameToLowerCase(tableIdentifier.database),
- tableIdentifier.table.toLowerCase,
- fields,
- partitionFields,
- tableProperties,
- bucketFields,
- isAlterFlow = false,
- tableComment)
+ val tableInfo = if (external) {
+ // read table info from schema file in the provided table path
+ val identifier = AbsoluteTableIdentifier.from(
+ tablePath.get,
+ CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession),
+ tableIdentifier.table)
+ val table = try {
+ SchemaReader.getTableInfo(identifier)
+ } catch {
+ case e: Throwable =>
+ operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader)
+ }
+ // set "_external" property, so that DROP TABLE will not delete the data
+ table.getFactTable.getTableProperties.put("_external", "true")
+ table
+ } else {
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = parser.prepareTableModel(
+ ifNotExists,
+ convertDbNameToLowerCase(tableIdentifier.database),
+ tableIdentifier.table.toLowerCase,
+ fields,
+ partitionFields,
+ tableProperties,
+ bucketFields,
+ isAlterFlow = false,
+ tableComment)
+ TableNewProcessor(tableModel)
+ }
selectQuery match {
case query@Some(q) =>
CarbonCreateTableAsSelectCommand(
- TableNewProcessor(tableModel),
- query.get,
- tableModel.ifNotExistsSet,
- tablePath)
+ tableInfo = tableInfo,
+ query = query.get,
+ ifNotExistsSet = ifNotExists,
+ tableLocation = tablePath)
case _ =>
- CarbonCreateTableCommand(TableNewProcessor(tableModel),
- tableModel.ifNotExistsSet,
- tablePath)
+ CarbonCreateTableCommand(
+ tableInfo = tableInfo,
+ ifNotExistsSet = ifNotExists,
+ tableLocation = tablePath)
}
}