You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/09/23 14:18:32 UTC

[carbondata] branch master updated: [CARBONDATA-3520] CTAS should fail if select query contains duplicate columns

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 07f8f79  [CARBONDATA-3520] CTAS should fail if select query contains duplicate columns
07f8f79 is described below

commit 07f8f7942b143ddf9c20a6df337fdce5f681a8a2
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Mon Sep 16 16:25:03 2019 +0530

    [CARBONDATA-3520] CTAS should fail if select query contains duplicate columns
    
    Problem:
    If Select query contains Duplicate columns, CTAS was creating
    a table with only one column, which is wrong
    
    Solution:
    Throw error message if Select query contains duplicate columns.
    
    This closes #3388
---
 .../createTable/TestCreateTableAsSelect.scala      | 37 ++++++++++++++++++++++
 .../sql/parser/CarbonSparkSqlParserUtil.scala      | 23 +++++++++++---
 2 files changed, 56 insertions(+), 4 deletions(-)

diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
index 3896061..8e4d8fa 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -407,6 +407,43 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("SELECT * FROM target_table"), Seq(Row("shenzhen", 24.5)))
   }
 
+  test("test duplicate columns with select query") {
+    sql("DROP TABLE IF EXISTS target_table")
+    sql("DROP TABLE IF EXISTS source_table")
+    // create carbon table and insert data
+    sql(
+      """
+        | CREATE TABLE source_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        |     STORED BY 'carbondata'
+        |     """.stripMargin)
+    sql("INSERT INTO source_table SELECT 1,'bob','shenzhen',27")
+    val e = intercept[AnalysisException] {
+      sql(
+        """
+          | CREATE TABLE target_table
+          | STORED BY 'carbondata'
+          | AS
+          |   SELECT t1.city, t2.city
+          |   FROM source_table t1, source_table t2 where t1.city=t2.city and t1.city = 'shenzhen'
+      """.stripMargin)
+    }
+    e.getMessage().toString.contains("Duplicated column names found in table definition of " +
+                                     "`target_table`: [\"city\"]")
+    sql(
+      """
+        | CREATE TABLE target_table
+        | STORED BY 'carbondata'
+        | AS
+        |   SELECT t1.city as a, t2.city as b
+        |   FROM source_table t1, source_table t2 where t1.city=t2.city and t1.city = 'shenzhen'
+      """.stripMargin)
+    checkAnswer(sql("select * from target_table"), Seq(Row("shenzhen", "shenzhen")))
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS carbon_ctas_test")
     sql("DROP TABLE IF EXISTS parquet_ctas_test")
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 5c008f2..4d85e88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -119,6 +119,8 @@ object CarbonSparkSqlParserUtil {
       case _ =>
       // ignore this case
     }
+    val columnNames = fields.map(_.name.get)
+    checkIfDuplicateColumnExists(columns, tableIdentifier, columnNames)
     if (partitionFields.nonEmpty && options.isStreaming) {
       operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns)
     }
@@ -355,16 +357,29 @@ object CarbonSparkSqlParserUtil {
 
     // Ensuring whether no duplicate name is used in table definition
     val colNames: Seq[String] = cols.map(_.name)
+    checkIfDuplicateColumnExists(columns, tableIdentifier, colNames)
+    colNames
+  }
+
+  private def checkIfDuplicateColumnExists(columns: ColTypeListContext,
+      tableIdentifier: TableIdentifier,
+      colNames: Seq[String]): Unit = {
     if (colNames.length != colNames.distinct.length) {
       val duplicateColumns = colNames.groupBy(identity).collect {
         case (x, ys) if ys.length > 1 => "\"" + x + "\""
       }
-      operationNotAllowed(s"Duplicated column names found in table definition of " +
-                          s"$tableIdentifier: ${ duplicateColumns.mkString("[", ",", "]") }",
-        columns)
+      val errorMessage = s"Duplicated column names found in table definition of " +
+                         s"$tableIdentifier: ${ duplicateColumns.mkString("[", ",", "]") }"
+      // In case of create table as select, ColTypeListContext will be null. Check if
+      // duplicateColumns present in column names list, If yes, throw exception
+      if (null != columns) {
+        operationNotAllowed(errorMessage, columns)
+      } else {
+        throw new UnsupportedOperationException(errorMessage)
+      }
     }
-    colNames
   }
+
   /**
    * The method return's the storage type
    * @param createFileFormat