You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/05/11 05:19:23 UTC

[carbondata] branch master updated: [CARBONDATA-3810] Partition column name should be case insensitive

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e51903c  [CARBONDATA-3810] Partition column name should be case insensitive
e51903c is described below

commit e51903c2188d7dae84aeeebe9ab2103a62619339
Author: QiangCai <qi...@qq.com>
AuthorDate: Thu May 7 16:42:40 2020 +0800

    [CARBONDATA-3810] Partition column name should be case insensitive
    
    Why is this PR needed?
    when inserting into the static partition, the partition column name is case sensitive now.
    
    What changes were proposed in this PR?
    the partition column name should be case insensitive.
    convert all partition column names to low case.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3753
---
 .../sql/execution/strategy/CarbonPlanHelper.scala  | 16 +++++++--
 .../spark/sql/execution/strategy/DDLStrategy.scala | 14 ++------
 .../StandardPartitionTableLoadingTestCase.scala    | 38 +++++++++++++++++++++-
 pom.xml                                            |  5 +--
 4 files changed, 56 insertions(+), 17 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
index 68fb2fe..b1e53c2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
@@ -18,12 +18,12 @@
 package org.apache.spark.sql.execution.strategy
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, InsertIntoCarbonTable, SparkSession}
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.{ExecutedCommandExec, RunnableCommand}
-import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
+import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand}
 import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructField
@@ -34,6 +34,18 @@ import org.apache.carbondata.core.util.DataTypeUtil
 
 object CarbonPlanHelper {
 
+  def insertInto(insertInto: InsertIntoCarbonTable): CarbonInsertIntoCommand = {
+    CarbonInsertIntoCommand(
+      databaseNameOp = Some(insertInto.table.carbonRelation.databaseName),
+      tableName = insertInto.table.carbonRelation.tableName,
+      options = scala.collection.immutable
+        .Map("fileheader" -> insertInto.table.tableSchema.get.fields.map(_.name).mkString(",")),
+      isOverwriteTable = insertInto.overwrite,
+      logicalPlan = insertInto.child,
+      tableInfo = insertInto.table.carbonRelation.carbonTable.getTableInfo,
+      partition = insertInto.partition.map(entry => (entry._1.toLowerCase, entry._2)))
+  }
+
   def addColumn(
       addColumnCommand: CarbonAlterTableAddColumnCommand,
       sparkSession: SparkSession
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index bf6fefd..c14161d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -59,18 +59,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       case loadData: LoadDataCommand
         if isCarbonTable(loadData.table) =>
         ExecutedCommandExec(DMLHelper.loadData(loadData, sparkSession)) :: Nil
-      case InsertIntoCarbonTable(
-      relation: CarbonDatasourceHadoopRelation, partition, child: LogicalPlan, overwrite, _) =>
-        ExecutedCommandExec(
-          CarbonInsertIntoCommand(
-            databaseNameOp = Some(relation.carbonRelation.databaseName),
-            tableName = relation.carbonRelation.tableName,
-            options = scala.collection.immutable
-              .Map("fileheader" -> relation.tableSchema.get.fields.map(_.name).mkString(",")),
-            isOverwriteTable = overwrite,
-            logicalPlan = child,
-            tableInfo = relation.carbonRelation.carbonTable.getTableInfo,
-            partition = partition)) :: Nil
+      case insert: InsertIntoCarbonTable =>
+        ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
       case insert: InsertIntoHadoopFsRelationCommand
         if insert.catalogTable.isDefined && isCarbonTable(insert.catalogTable.get.identifier) =>
         DataWritingCommandExec(DMLHelper.insertInto(insert), planLater(insert.query)) :: Nil
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index d05b9ac..796dd50 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.Strings
@@ -519,6 +519,40 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql("drop table origin_csv")
   }
 
+  test("test partition column case insensitive: insert into") {
+    sql(
+      """create table cs_insert_p
+        |(id int, Name string)
+        |stored as carbondata
+        |partitioned by (c1 int, c2 int, C3 string)""".stripMargin)
+    sql("alter table cs_insert_p drop if exists partition(C1=1, C2=111, c3='2019-11-18')")
+    sql("alter table cs_insert_p add if not exists partition(C1=1, c2=111, C3='2019-11-18')")
+    sql(
+      """insert into table cs_insert_p
+        | partition(c1=3, C2=111, c3='2019-11-18')
+        | select 200, 'cc'""".stripMargin)
+    checkAnswer(sql("select count(*) from cs_insert_p"), Seq(Row(1)))
+    sql("alter table cs_insert_p drop if exists partition(C1=3, C2=111, c3='2019-11-18')")
+    checkAnswer(sql("select count(*) from cs_insert_p"), Seq(Row(0)))
+  }
+
+  test("test partition column case insensitive: load data") {
+    sql(
+      """
+        | CREATE TABLE cs_load_p (doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empnO int, empnAme String, designaTion String)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE cs_load_p PARTITION(empNo='99', empName='ravi', Designation='xx')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE cs_load_p PARTITION(empno='100', emPname='indra', designation='yy')""")
+    checkAnswer(sql("show partitions cs_load_p"), Seq(
+      Row("empno=100/empname=indra/designation=yy"),
+      Row("empno=99/empname=ravi/designation=xx")))
+  }
+
   def verifyInsertForPartitionTable(tableName: String, sort_scope: String): Unit = {
     sql(s"drop table if exists $tableName")
     sql(
@@ -628,6 +662,8 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql("drop table if exists restorepartition")
     sql("drop table if exists casesensitivepartition")
     sql("drop table if exists new_par")
+    sql("drop table if exists cs_insert_p")
+    sql("drop table if exists cs_load_p")
   }
 
 }
diff --git a/pom.xml b/pom.xml
index 0a9db3f..e2412db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,7 @@
     <scala.version>2.11.8</scala.version>
     <hadoop.deps.scope>compile</hadoop.deps.scope>
     <spark.version>2.3.4</spark.version>
+    <spark.binary.version>2.3</spark.binary.version>
     <spark.deps.scope>compile</spark.deps.scope>
     <scala.deps.scope>compile</scala.deps.scope>
     <dev.path>${basedir}/dev</dev.path>
@@ -548,7 +549,7 @@
           <plugin>
             <groupId>org.codehaus.mojo</groupId>
             <artifactId>flatten-maven-plugin</artifactId>
-            <!--<version>1.2.2</version>-->
+            <version>1.2.2</version>
             <configuration>
             </configuration>
             <executions>
@@ -619,7 +620,7 @@
           <plugin>
             <groupId>org.codehaus.mojo</groupId>
             <artifactId>flatten-maven-plugin</artifactId>
-            <!--<version>1.2.2</version>-->
+            <version>1.2.2</version>
             <configuration>
             </configuration>
             <executions>