You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/08/31 12:28:10 UTC

[carbondata] branch master updated: [CARBONDATA-4274] Fix create partition table error with spark 3.1

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ca659b5  [CARBONDATA-4274] Fix create partition table error with spark 3.1
ca659b5 is described below

commit ca659b5cd3219de7c8a9603784559e20fdff1fda
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Thu Aug 19 20:26:00 2021 +0530

    [CARBONDATA-4274] Fix create partition table error with spark 3.1
    
    Why is this PR needed?
    With spark 3.1, we can create a partition table by giving partition
    columns from schema.
    Like below example:
    create table partitionTable(c1 int, c2 int, v1 string, v2 string)
    stored as carbondata partitioned by (v2,c2)
    
    When the table is created by SparkSession with CarbonExtension,
    catalog table is created with the specified partitions.
    But in cluster/ with carbon session, when we create partition
    table with above syntax it is creating normal table with no partitions.
    
    What changes were proposed in this PR?
    partitionByStructFields is empty when we directly give partition
    column names. So it was not creating a partition table. Made
    changes to identify the partition column names and get the struct
    field and datatype info from table columns.
    
    This closes #4208
---
 .../org/apache/spark/sql/SparkVersionAdapter.scala | 16 ++++---------
 .../spark/sql/parser/CarbonSparkSqlParser.scala    | 27 ++++++++++++++++++++--
 .../StandardPartitionTableQueryTestCase.scala      | 21 +++++++++++++++++
 3 files changed, 50 insertions(+), 14 deletions(-)

diff --git a/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala b/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
index 5cf5c15..21d7586 100644
--- a/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
+++ b/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
@@ -233,17 +233,6 @@ trait SparkVersionAdapter {
     val partitionerFields = partitionByStructFields.map { structField =>
       PartitionerField(structField.name, Some(structField.dataType.toString), null)
     }
-    // validate partition clause
-    if (partitionerFields.nonEmpty) {
-      // partition columns should not be part of the schema
-      val badPartCols = partitionerFields.map(_.partitionColumn.toLowerCase).toSet
-        .intersect(colNames.map(_.toLowerCase).toSet)
-      if (badPartCols.nonEmpty) {
-        operationNotAllowed(s"Partition columns should not be specified in the schema: " +
-          badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]")
-          , partitionColumns: PartitionFieldListContext)
-      }
-    }
     partitionerFields
   }
 
@@ -280,7 +269,10 @@ trait SparkVersionAdapter {
     val options = new CarbonOption(properties)
     // validate streaming property
     validateStreamingProperty(options)
-    var fields = parser.getFields(cols ++ partitionByStructFields)
+    // with Spark 3.1, partitioned columns can be already present in schema.
+    // Check and remove from fields and add partition columns at last
+    val updatedCols = cols.filterNot(x => partitionByStructFields.contains(x))
+    var fields = parser.getFields(updatedCols ++ partitionByStructFields)
     // validate for create table as select
     selectQuery match {
       case Some(q) =>
diff --git a/integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index ac0be49..49978d4 100644
--- a/integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
 import org.antlr.v4.runtime.tree.TerminalNode
 import org.apache.spark.sql.{CarbonThreadUtil, CarbonToSparkAdapter, SparkSession}
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, SqlBaseParser}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkSqlAstBuilder, SparkSqlParser}
@@ -130,8 +131,30 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
     val tableProperties = convertPropertiesToLowercase(properties)
 
     // validate partition clause
-    val partitionByStructFields = Option(partitionColumns).toSeq
-        .flatMap(x => visitPartitionFieldList(x)._2)
+    // There can be two scenarios for creating partition table with spark 3.1.
+    // Scenario 1: create partition columns with datatype.In this case we get struct fields from
+    // visitPartitionFieldList and the transform list is empty.
+    // Example syntax: create table example(col1 int) partitioned by(col2 int)
+    // Scenario 2: create partition columns using column names from schema. Then struct fields will
+    // be empty as datatype is not given and transform list consists of field references with
+    // partition column names. Search the names in table columns to extract the struct fields.
+    // Example syntax: create table example(col1 int, col2 int) partitioned by(col2)
+    var (partitionTransformList,
+    partitionByStructFields) = visitPartitionFieldList(partitionColumns)
+    if (partitionByStructFields.isEmpty && partitionTransformList.nonEmpty) {
+      val partitionNames = partitionTransformList
+        .flatMap(_.references().flatMap(_.fieldNames()))
+      partitionNames.foreach(partName => {
+        val structFiled = cols.find(x => x.name.equals(partName))
+        if (structFiled != None) {
+          partitionByStructFields = partitionByStructFields :+ structFiled.get
+        } else {
+          operationNotAllowed(s"Partition columns not specified in the schema: " +
+                              partitionNames.mkString("[", ",", "]")
+            , partitionColumns: PartitionFieldListContext)
+        }
+      })
+    }
     val partitionFields = CarbonToSparkAdapter.
       validatePartitionFields(partitionColumns, colNames, tableProperties,
       partitionByStructFields)
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 42da6c7..20a0d8c 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -532,6 +532,27 @@ test("Creation of partition table should fail if the colname in table schema and
     sql("drop table if exists onlyPart")
   }
 
+  if (CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3,
+      CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3_DEFAULT).toBoolean) {
+    test("test create partition on existing table columns") {
+      sql("drop table if exists partitionTable")
+      sql("create table partitionTable(c1 int, c2 int, v1 string, v2 string) " +
+          "stored as carbondata partitioned by (v2,c2)")
+      val descTable = sql(s"describe formatted partitionTable").collect
+      descTable.find(_.get(0).toString.contains("Partition Columns")) match {
+        case Some(row) => assert(row.get(1).toString.contains("v2:STRING, c2:INT"))
+        case None => assert(false)
+      }
+      sql("insert into partitionTable select 1,'sd','sd',2")
+      sql("alter table partitionTable add partition (v2='ke', c2=3) location 'loc1'")
+      checkAnswer(sql("show partitions partitionTable"),
+        Seq(Row("v2=sd/c2=2"), Row("v2=ke/c2=3")))
+      checkAnswer(sql("select *from partitionTable"), Seq(Row(1, "sd", "sd", 2)))
+      sql("drop table if exists partitionTable")
+    }
+  }
+
   private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = {
     val plan = frame.queryExecution.sparkPlan
     val scanRDD = plan collect {