You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/05/11 05:19:23 UTC
[carbondata] branch master updated: [CARBONDATA-3810] Partition
column name should be case insensitive
This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new e51903c [CARBONDATA-3810] Partition column name should be case insensitive
e51903c is described below
commit e51903c2188d7dae84aeeebe9ab2103a62619339
Author: QiangCai <qi...@qq.com>
AuthorDate: Thu May 7 16:42:40 2020 +0800
[CARBONDATA-3810] Partition column name should be case insensitive
Why is this PR needed?
when inserting into the static partition, the partition column name is case sensitive now.
What changes were proposed in this PR?
the partition column name should be case insensitive.
convert all partition column names to low case.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3753
---
.../sql/execution/strategy/CarbonPlanHelper.scala | 16 +++++++--
.../spark/sql/execution/strategy/DDLStrategy.scala | 14 ++------
.../StandardPartitionTableLoadingTestCase.scala | 38 +++++++++++++++++++++-
pom.xml | 5 +--
4 files changed, 56 insertions(+), 17 deletions(-)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
index 68fb2fe..b1e53c2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
@@ -18,12 +18,12 @@
package org.apache.spark.sql.execution.strategy
import org.apache.commons.lang3.StringUtils
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, InsertIntoCarbonTable, SparkSession}
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.{ExecutedCommandExec, RunnableCommand}
-import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
+import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.StructField
@@ -34,6 +34,18 @@ import org.apache.carbondata.core.util.DataTypeUtil
object CarbonPlanHelper {
+ def insertInto(insertInto: InsertIntoCarbonTable): CarbonInsertIntoCommand = {
+ CarbonInsertIntoCommand(
+ databaseNameOp = Some(insertInto.table.carbonRelation.databaseName),
+ tableName = insertInto.table.carbonRelation.tableName,
+ options = scala.collection.immutable
+ .Map("fileheader" -> insertInto.table.tableSchema.get.fields.map(_.name).mkString(",")),
+ isOverwriteTable = insertInto.overwrite,
+ logicalPlan = insertInto.child,
+ tableInfo = insertInto.table.carbonRelation.carbonTable.getTableInfo,
+ partition = insertInto.partition.map(entry => (entry._1.toLowerCase, entry._2)))
+ }
+
def addColumn(
addColumnCommand: CarbonAlterTableAddColumnCommand,
sparkSession: SparkSession
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index bf6fefd..c14161d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -59,18 +59,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
case loadData: LoadDataCommand
if isCarbonTable(loadData.table) =>
ExecutedCommandExec(DMLHelper.loadData(loadData, sparkSession)) :: Nil
- case InsertIntoCarbonTable(
- relation: CarbonDatasourceHadoopRelation, partition, child: LogicalPlan, overwrite, _) =>
- ExecutedCommandExec(
- CarbonInsertIntoCommand(
- databaseNameOp = Some(relation.carbonRelation.databaseName),
- tableName = relation.carbonRelation.tableName,
- options = scala.collection.immutable
- .Map("fileheader" -> relation.tableSchema.get.fields.map(_.name).mkString(",")),
- isOverwriteTable = overwrite,
- logicalPlan = child,
- tableInfo = relation.carbonRelation.carbonTable.getTableInfo,
- partition = partition)) :: Nil
+ case insert: InsertIntoCarbonTable =>
+ ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
case insert: InsertIntoHadoopFsRelationCommand
if insert.catalogTable.isDefined && isCarbonTable(insert.catalogTable.get.identifier) =>
DataWritingCommandExec(DMLHelper.insertInto(insert), planLater(insert.query)) :: Nil
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index d05b9ac..796dd50 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
+import org.apache.spark.sql.{CarbonEnv, Row}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.Strings
@@ -519,6 +519,40 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
sql("drop table origin_csv")
}
+ test("test partition column case insensitive: insert into") {
+ sql(
+ """create table cs_insert_p
+ |(id int, Name string)
+ |stored as carbondata
+ |partitioned by (c1 int, c2 int, C3 string)""".stripMargin)
+ sql("alter table cs_insert_p drop if exists partition(C1=1, C2=111, c3='2019-11-18')")
+ sql("alter table cs_insert_p add if not exists partition(C1=1, c2=111, C3='2019-11-18')")
+ sql(
+ """insert into table cs_insert_p
+ | partition(c1=3, C2=111, c3='2019-11-18')
+ | select 200, 'cc'""".stripMargin)
+ checkAnswer(sql("select count(*) from cs_insert_p"), Seq(Row(1)))
+ sql("alter table cs_insert_p drop if exists partition(C1=3, C2=111, c3='2019-11-18')")
+ checkAnswer(sql("select count(*) from cs_insert_p"), Seq(Row(0)))
+ }
+
+ test("test partition column case insensitive: load data") {
+ sql(
+ """
+ | CREATE TABLE cs_load_p (doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empnO int, empnAme String, designaTion String)
+ | STORED AS carbondata
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE cs_load_p PARTITION(empNo='99', empName='ravi', Designation='xx')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE cs_load_p PARTITION(empno='100', emPname='indra', designation='yy')""")
+ checkAnswer(sql("show partitions cs_load_p"), Seq(
+ Row("empno=100/empname=indra/designation=yy"),
+ Row("empno=99/empname=ravi/designation=xx")))
+ }
+
def verifyInsertForPartitionTable(tableName: String, sort_scope: String): Unit = {
sql(s"drop table if exists $tableName")
sql(
@@ -628,6 +662,8 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
sql("drop table if exists restorepartition")
sql("drop table if exists casesensitivepartition")
sql("drop table if exists new_par")
+ sql("drop table if exists cs_insert_p")
+ sql("drop table if exists cs_load_p")
}
}
diff --git a/pom.xml b/pom.xml
index 0a9db3f..e2412db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,7 @@
<scala.version>2.11.8</scala.version>
<hadoop.deps.scope>compile</hadoop.deps.scope>
<spark.version>2.3.4</spark.version>
+ <spark.binary.version>2.3</spark.binary.version>
<spark.deps.scope>compile</spark.deps.scope>
<scala.deps.scope>compile</scala.deps.scope>
<dev.path>${basedir}/dev</dev.path>
@@ -548,7 +549,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
- <!--<version>1.2.2</version>-->
+ <version>1.2.2</version>
<configuration>
</configuration>
<executions>
@@ -619,7 +620,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
- <!--<version>1.2.2</version>-->
+ <version>1.2.2</version>
<configuration>
</configuration>
<executions>