You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/09/23 14:18:32 UTC
[carbondata] branch master updated: [CARBONDATA-3520] CTAS should
fail if select query contains duplicate columns
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 07f8f79 [CARBONDATA-3520] CTAS should fail if select query contains duplicate columns
07f8f79 is described below
commit 07f8f7942b143ddf9c20a6df337fdce5f681a8a2
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Mon Sep 16 16:25:03 2019 +0530
[CARBONDATA-3520] CTAS should fail if select query contains duplicate columns
Problem:
If Select query contains Duplicate columns, CTAS was creating
a table with only one column, which is wrong
Solution:
Throw error message if Select query contains duplicate columns.
This closes #3388
---
.../createTable/TestCreateTableAsSelect.scala | 37 ++++++++++++++++++++++
.../sql/parser/CarbonSparkSqlParserUtil.scala | 23 +++++++++++---
2 files changed, 56 insertions(+), 4 deletions(-)
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
index 3896061..8e4d8fa 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -407,6 +407,43 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 24.5)))
}
+ test("test duplicate columns with select query") {
+ sql("DROP TABLE IF EXISTS target_table")
+ sql("DROP TABLE IF EXISTS source_table")
+ // create carbon table and insert data
+ sql(
+ """
+ | CREATE TABLE source_table(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | age INT)
+ | STORED BY 'carbondata'
+ | """.stripMargin)
+ sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+ val e = intercept[AnalysisException] {
+ sql(
+ """
+ | CREATE TABLE target_table
+ | STORED BY 'carbondata'
+ | AS
+ | SELECT t1.city, t2.city
+ | FROM source_table t1, source_table t2 where t1.city=t2.city and t1.city = 'shenzhen'
+ """.stripMargin)
+ }
+ e.getMessage().toString.contains("Duplicated column names found in table definition of " +
+ "`target_table`: [\"city\"]")
+ sql(
+ """
+ | CREATE TABLE target_table
+ | STORED BY 'carbondata'
+ | AS
+ | SELECT t1.city as a, t2.city as b
+ | FROM source_table t1, source_table t2 where t1.city=t2.city and t1.city = 'shenzhen'
+ """.stripMargin)
+ checkAnswer(sql("select * from target_table"), Seq(Row("shenzhen", "shenzhen")))
+ }
+
override def afterAll {
sql("DROP TABLE IF EXISTS carbon_ctas_test")
sql("DROP TABLE IF EXISTS parquet_ctas_test")
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 5c008f2..4d85e88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -119,6 +119,8 @@ object CarbonSparkSqlParserUtil {
case _ =>
// ignore this case
}
+ val columnNames = fields.map(_.name.get)
+ checkIfDuplicateColumnExists(columns, tableIdentifier, columnNames)
if (partitionFields.nonEmpty && options.isStreaming) {
operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns)
}
@@ -355,16 +357,29 @@ object CarbonSparkSqlParserUtil {
// Ensuring whether no duplicate name is used in table definition
val colNames: Seq[String] = cols.map(_.name)
+ checkIfDuplicateColumnExists(columns, tableIdentifier, colNames)
+ colNames
+ }
+
+ private def checkIfDuplicateColumnExists(columns: ColTypeListContext,
+ tableIdentifier: TableIdentifier,
+ colNames: Seq[String]): Unit = {
if (colNames.length != colNames.distinct.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}
- operationNotAllowed(s"Duplicated column names found in table definition of " +
- s"$tableIdentifier: ${ duplicateColumns.mkString("[", ",", "]") }",
- columns)
+ val errorMessage = s"Duplicated column names found in table definition of " +
+ s"$tableIdentifier: ${ duplicateColumns.mkString("[", ",", "]") }"
+ // In case of create table as select, ColTypeListContext will be null. Check if
+ // duplicateColumns present in column names list, If yes, throw exception
+ if (null != columns) {
+ operationNotAllowed(errorMessage, columns)
+ } else {
+ throw new UnsupportedOperationException(errorMessage)
+ }
}
- colNames
}
+
/**
* The method return's the storage type
* @param createFileFormat