You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/10/07 11:15:38 UTC

[carbondata] branch master updated: [CARBONDATA-4293] Make Table created without external keyword as Transactional table

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a710f9  [CARBONDATA-4293] Make Table created without external keyword as Transactional table
5a710f9 is described below

commit 5a710f9d747ebd8b37bf4338b3e10d601a45dc91
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Wed Sep 22 23:13:23 2021 +0530

    [CARBONDATA-4293] Make Table created without external keyword as Transactional table
    
    Why is this PR needed?
    Currently, when you create a table with location( without external keyword) in cluster,
    the corresponding table is created as transactional table. If External keyword is
    present, then it is created as non-transactional table. This scenario is not handled
    in local mode.
    
    What changes were proposed in this PR?
    Made changes, to check if external keyword is present or not. If present, then make
    the corresponding table as transactional table.
    
    This closes #4221
---
 .../org/apache/spark/sql/SparkVersionAdapter.scala | 18 ++++++++++++++-
 .../sql/parser/CarbonExtensionSqlParser.scala      |  5 ++--
 .../sql/parser/CarbonSparkSqlParserUtil.scala      | 12 ++++++++--
 .../org/apache/spark/sql/SparkVersionAdapter.scala | 20 +++++++++++++++-
 .../sql/parser/CarbonExtensionSqlParser.scala      |  5 ++--
 .../createTable/TestCreateExternalTable.scala      | 27 ++++++++++++++++++----
 6 files changed, 75 insertions(+), 12 deletions(-)

diff --git a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala
index 23de19d..cfc580c 100644
--- a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala
+++ b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.{QueryExecution, ShuffledRowRDD, SparkPlan, SQLExecution, UnaryExecNode}
 import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
-import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, RefreshTable}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy, RefreshTable}
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
 import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
@@ -430,6 +430,22 @@ trait SparkVersionAdapter {
   def evaluateWithPredicate(exp: Expression, schema: Seq[Attribute], row: InternalRow): Any = {
     InterpretedPredicate.create(exp, schema).expression.eval(row)
   }
+
+  def getUpdatedPlan(plan: LogicalPlan, sqlText: String): LogicalPlan = {
+    plan match {
+      case create@CreateTable(tableDesc, mode, query) =>
+        if ( tableDesc.storage.locationUri.isDefined &&
+             !sqlText.toUpperCase.startsWith("CREATE EXTERNAL TABLE ")) {
+          // add a property to differentiate if create table statement has external keyword or not
+          val newProperties = tableDesc.properties. +("hasexternalkeyword" -> "false")
+          val updatedTableDesc = tableDesc.copy(properties = newProperties)
+          CreateTable(updatedTableDesc, mode, query)
+        } else {
+          create
+        }
+      case others => others
+    }
+  }
 }
 
 case class CarbonBuildSide(buildSide: BuildSide) {
diff --git a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
index a57ca36..18d0f1b 100644
--- a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
+++ b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.parser
 
-import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, CarbonToSparkAdapter, SparkSession}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlParser
@@ -57,7 +57,8 @@ class CarbonExtensionSqlParser(
             throw ce
           case at: Throwable =>
             try {
-              val parsedPlan = initialParser.parsePlan(sqlText)
+              val parsedPlan = CarbonToSparkAdapter.getUpdatedPlan(initialParser.parsePlan(sqlText),
+                sqlText)
               CarbonScalaUtil.cleanParserThreadLocals
               parsedPlan
             } catch {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index e972fb3..6342acb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -194,7 +194,9 @@ object CarbonSparkSqlParserUtil {
       throw new MalformedCarbonCommandException(
         "Creating table without column(s) is not supported")
     }
-    if (isExternal && fields.isEmpty && tableProperties.nonEmpty) {
+    // filter out internally added external keyword property
+    val newTableProperties = tableProperties.filterNot(_._1.equalsIgnoreCase("hasexternalkeyword"))
+    if (isExternal && fields.isEmpty && newTableProperties.nonEmpty) {
       // as fields are always zero for external table, cannot validate table properties.
       throw new MalformedCarbonCommandException(
         "Table properties are not supported for external table")
@@ -232,17 +234,23 @@ object CarbonSparkSqlParserUtil {
         } catch {
           case e: Throwable =>
             if (fields.nonEmpty) {
+              val partitionerFields = fields
+                .filter(field => partitionColumnNames.contains(field.column))
+                .map(field => PartitionerField(field.column, field.dataType, null))
               val tableModel: TableModel = CarbonParserUtil.prepareTableModel(
                 ifNotExists,
                 Some(identifier.getDatabaseName),
                 identifier.getTableName,
                 fields,
-                Seq.empty,
+                partitionerFields,
                 tblProperties,
                 bucketFields,
                 isAlterFlow = false,
                 table.comment
               )
+              if(table.properties.contains("hasexternalkeyword")) {
+                isTransactionalTable = true
+              }
               TableNewProcessor(tableModel)
             } else {
               throw new MalformedCarbonCommandException(
diff --git a/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala b/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
index 21d7586..c80394d 100644
--- a/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
+++ b/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide
 import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableStatement, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType}
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, RebaseDateTime, TimestampFormatter}
 import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode}
@@ -471,6 +471,24 @@ trait SparkVersionAdapter {
       overwrite,
       ifPartitionNotExists)
   }
+
+  def getUpdatedPlan(plan: LogicalPlan, sqlText: String): LogicalPlan = {
+    plan match {
+      case create@CreateTableStatement(_, _, _, _, properties, _, _,
+      location, _, _, _, _) =>
+        if ( location.isDefined &&
+             !sqlText.toUpperCase.startsWith("CREATE EXTERNAL TABLE ")) {
+          // add a property to differentiate if create table statement has external keyword or not
+          val newProperties = properties. +("hasexternalkeyword" -> "false")
+          CreateTableStatement(create.tableName, create.tableSchema, create.partitioning,
+            create.bucketSpec, newProperties, create.provider, create.options,
+            location, create.comment, create.serde, create.external, create.ifNotExists)
+        } else {
+          create
+        }
+      case others => others
+    }
+  }
 }
 
 case class CarbonBuildSide(buildSide: BuildSide) {
diff --git a/integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala b/integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
index fc13330..bfa7715 100644
--- a/integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
+++ b/integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.parser
 
-import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, CarbonToSparkAdapter, SparkSession}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlParser
@@ -57,7 +57,8 @@ class CarbonExtensionSqlParser(
             throw ce
           case at: Throwable =>
             try {
-              val parsedPlan = initialParser.parsePlan(sqlText)
+              val parsedPlan = CarbonToSparkAdapter.getUpdatedPlan(initialParser.parsePlan(sqlText),
+                sqlText)
               CarbonScalaUtil.cleanParserThreadLocals
               parsedPlan
             } catch {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
index c8e797c..7cf6122 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -201,10 +202,28 @@ class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll {
          |stored as carbondata
          |LOCATION '$newStoreLocation'
        """.stripMargin)
-    val exception = intercept[Exception] {
-      sql("select * from source").show(false)
-    }
-    assert(exception.getMessage.contains("No Index files are present in the table location"))
+    val tableIdentifier = new TableIdentifier("source", Some("default"))
+    val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
+    assert(carbonTable.isTransactionalTable && carbonTable.isHivePartitionTable)
+    sql("INSERT INTO source select 100,'spark','test1'")
+    checkAnswer(sql("select * from source"), Seq(Row(100, "spark", "test1")))
+    sql("drop table if exists source")
+  }
+
+  test("test create table with location") {
+    // test non-partition table
+    val newStoreLocation = s"$storeLocation/origin1"
+    FileUtils.deleteDirectory(new File(newStoreLocation))
+    sql("drop table if exists source")
+    sql(
+      s"""
+         |CREATE TABLE source(a int, b string,c string)
+         |stored as carbondata
+         |LOCATION '$newStoreLocation'
+       """.stripMargin)
+    val tableIdentifier = new TableIdentifier("source", Some("default"))
+    val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession)
+    assert(carbonTable.isTransactionalTable)
     sql("INSERT INTO source select 100,'spark','test1'")
     checkAnswer(sql("select * from source"), Seq(Row(100, "spark", "test1")))
     sql("drop table if exists source")