You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:45 UTC

[27/50] [abbrv] carbondata git commit: [CARBONDATA-1968] Add external table support

[CARBONDATA-1968] Add external table support

This PR adds support for creating external table with existing carbondata files, using Hive syntax.
CREATE EXTERNAL TABLE tableName STORED BY 'carbondata' LOCATION 'path'

This closes #1749


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/111c3821
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/111c3821
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/111c3821

Branch: refs/heads/fgdatamap
Commit: 111c3821557820241d1114d87eae2f7cd017e610
Parents: 952665a
Author: Jacky Li <ja...@qq.com>
Authored: Tue Jan 2 23:46:14 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Feb 1 00:15:26 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |  9 ++
 .../createTable/TestCreateExternalTable.scala   | 91 ++++++++++++++++++++
 .../TestDataWithDicExcludeAndInclude.scala      | 10 ---
 .../command/table/CarbonDropTableCommand.scala  |  5 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 64 +++++++++-----
 5 files changed, 147 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/111c3821/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 4bb0d20..07989b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -785,6 +785,15 @@ public class CarbonTable implements Serializable {
         && !tableInfo.getParentRelationIdentifiers().isEmpty();
   }
 
+  /**
+   * Return true if this is an external table (table with property "_external"="true", this is
+   * an internal table property set during table creation)
+   */
+  public boolean isExternalTable() {
+    String external = tableInfo.getFactTable().getTableProperties().get("_external");
+    return external != null && external.equalsIgnoreCase("true");
+  }
+
   public long size() throws IOException {
     Map<String, Long> dataIndexSize = CarbonUtil.calculateDataIndexSize(this);
     Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111c3821/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
new file mode 100644
index 0000000..67370eb
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll {
+
+  var originDataPath: String = _
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS origin")
+    // create carbon table and insert data
+    sql("CREATE TABLE origin(key INT, value STRING) STORED BY 'carbondata'")
+    sql("INSERT INTO origin select 100,'spark'")
+    sql("INSERT INTO origin select 200,'hive'")
+    originDataPath = s"$storeLocation/origin"
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS origin")
+  }
+
+  test("create external table with existing files") {
+    assert(new File(originDataPath).exists())
+    sql("DROP TABLE IF EXISTS source")
+
+    // create external table with existing files
+    sql(
+      s"""
+         |CREATE EXTERNAL TABLE source
+         |STORED BY 'carbondata'
+         |LOCATION '$storeLocation/origin'
+       """.stripMargin)
+    checkAnswer(sql("SELECT count(*) from source"), sql("SELECT count(*) from origin"))
+
+    val carbonTable = CarbonEnv.getCarbonTable(None, "source")(sqlContext.sparkSession)
+    assert(carbonTable.isExternalTable)
+    
+    sql("DROP TABLE IF EXISTS source")
+
+    // DROP TABLE should not delete data
+    assert(new File(originDataPath).exists())
+  }
+
+  test("create external table with empty folder") {
+    val exception = intercept[AnalysisException] {
+      sql(
+        s"""
+           |CREATE EXTERNAL TABLE source
+           |STORED BY 'carbondata'
+           |LOCATION './nothing'
+         """.stripMargin)
+    }
+    assert(exception.getMessage().contains("Invalid table path provided"))
+  }
+
+  test("create external table with CTAS") {
+    val exception = intercept[AnalysisException] {
+      sql(
+        """
+          |CREATE EXTERNAL TABLE source
+          |STORED BY 'carbondata'
+          |LOCATION './nothing'
+          |AS
+          | SELECT * FROM origin
+        """.stripMargin)
+    }
+    assert(exception.getMessage().contains("Create external table as select"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111c3821/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
index c788857..201da39 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataWithDicExcludeAndInclude.scala
@@ -90,16 +90,6 @@ class TestLoadDataWithDictionaryExcludeAndInclude extends QueryTest with BeforeA
     )
   }
 
-  test("test create external table should fail") {
-    assert(intercept[AnalysisException](
-      sql(
-        """
-          | CREATE EXTERNAL TABLE t1 (id string, value int)
-          | STORED BY 'carbondata'
-        """.stripMargin)
-    ).message.contains("Operation not allowed: CREATE EXTERNAL TABLE"))
-  }
-
   override def afterAll {
     dropTable
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111c3821/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 9c0eb57..5e00914 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -144,7 +144,10 @@ case class CarbonDropTableCommand(
       // delete the table folder
       val tablePath = carbonTable.getTablePath
       val fileType = FileFactory.getFileType(tablePath)
-      if (FileFactory.isFileExist(tablePath, fileType)) {
+
+      // delete table data only if it is not external table
+      if (FileFactory.isFileExist(tablePath, fileType) &&
+          !carbonTable.isExternalTable) {
         val file = FileFactory.getCarbonFile(tablePath, fileType)
         CarbonUtil.deleteFoldersAndFilesSilent(file)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111c3821/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 4b77417..ad6d0c7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -32,6 +32,8 @@ import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -164,9 +166,6 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
     if (bucketSpecContext != null) {
       operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
     }
-    if (external) {
-      operationNotAllowed("CREATE EXTERNAL TABLE", tableHeader)
-    }
 
     val cols = Option(columns).toSeq.flatMap(visitColTypeList)
     val properties = getPropertyKeyValues(tablePropertyList)
@@ -231,6 +230,10 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
           operationNotAllowed(
             "Schema may not be specified in a Create Table As Select (CTAS) statement", columns)
         }
+        // external table is not allow
+        if (external) {
+          operationNotAllowed("Create external table as select", tableHeader)
+        }
         fields = parser
           .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
             .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
@@ -242,29 +245,48 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
     }
     // validate tblProperties
     val bucketFields = parser.getBucketFields(tableProperties, fields, options)
-    // prepare table model of the collected tokens
-    val tableModel: TableModel = parser.prepareTableModel(
-      ifNotExists,
-      convertDbNameToLowerCase(tableIdentifier.database),
-      tableIdentifier.table.toLowerCase,
-      fields,
-      partitionFields,
-      tableProperties,
-      bucketFields,
-      isAlterFlow = false,
-      tableComment)
 
+    val tableInfo = if (external) {
+      // read table info from schema file in the provided table path
+      val identifier = AbsoluteTableIdentifier.from(
+        tablePath.get,
+        CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession),
+        tableIdentifier.table)
+      val table = try {
+        SchemaReader.getTableInfo(identifier)
+      } catch {
+        case e: Throwable =>
+          operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader)
+      }
+      // set "_external" property, so that DROP TABLE will not delete the data
+      table.getFactTable.getTableProperties.put("_external", "true")
+      table
+    } else {
+      // prepare table model of the collected tokens
+      val tableModel: TableModel = parser.prepareTableModel(
+        ifNotExists,
+        convertDbNameToLowerCase(tableIdentifier.database),
+        tableIdentifier.table.toLowerCase,
+        fields,
+        partitionFields,
+        tableProperties,
+        bucketFields,
+        isAlterFlow = false,
+        tableComment)
+      TableNewProcessor(tableModel)
+    }
     selectQuery match {
       case query@Some(q) =>
         CarbonCreateTableAsSelectCommand(
-          TableNewProcessor(tableModel),
-          query.get,
-          tableModel.ifNotExistsSet,
-          tablePath)
+          tableInfo = tableInfo,
+          query = query.get,
+          ifNotExistsSet = ifNotExists,
+          tableLocation = tablePath)
       case _ =>
-        CarbonCreateTableCommand(TableNewProcessor(tableModel),
-          tableModel.ifNotExistsSet,
-          tablePath)
+        CarbonCreateTableCommand(
+          tableInfo = tableInfo,
+          ifNotExistsSet = ifNotExists,
+          tableLocation = tablePath)
     }
   }