You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/10/26 05:28:42 UTC

[carbondata] branch master updated: [CARBONDATA-4303] Columns mismatch when insert into table with static partition

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dbd2a5  [CARBONDATA-4303] Columns mismatch when insert into table with static partition
9dbd2a5 is described below

commit 9dbd2a59ccc92beba8b8a9d8b7834f0e5b2bba8c
Author: jack86596 <ja...@gmail.com>
AuthorDate: Tue Oct 12 11:22:44 2021 +0800

    [CARBONDATA-4303] Columns mismatch when insert into table with static partition
    
    Why is this PR needed?
    When insert into table with static partition, source projects should not contain
    static partition column, target table will have all columns, the columns number
    comparison between source table and target table is: source table column
    number = target table column number - static partition column number.
    
    What changes were proposed in this PR?
    Before do the column number comparison, remove the static partition column
    from target table.
    
    This Closes #4233
---
 .../spark/sql/hive/CarbonAnalysisRules.scala       | 49 ++++++++++++----------
 .../allqueries/InsertIntoCarbonTableTestCase.scala | 18 +++++++-
 .../booleantype/BooleanDataTypesInsertTest.scala   | 38 +----------------
 .../testsuite/iud/DeleteCarbonTableTestCase.scala  |  4 +-
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  |  2 +-
 .../spark/testsuite/merge/MergeTestCase.scala      |  2 +-
 .../StandardPartitionTableDropTestCase.scala       |  4 +-
 7 files changed, 51 insertions(+), 66 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 74bdca8..c324f72 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -271,7 +271,10 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
     val carbonTable = carbonDSRelation.carbonRelation.carbonTable
     val tableProperties = carbonTable.getTableInfo.getFactTable.getTableProperties
     val spatialProperty = tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
-    val expectedOutput = carbonDSRelation.carbonRelation.output
+    val staticParCols = CarbonToSparkAdapter.getPartitionsFromInsert(p)
+      .filter(_._2.isDefined).keySet.map(_.toLowerCase())
+    val expectedOutput = carbonDSRelation.carbonRelation.output.filterNot(
+      a => staticParCols.contains(a.name.toLowerCase()))
     if (expectedOutput.size > CarbonCommonConstants
       .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
       CarbonException.analysisException(
@@ -291,7 +294,15 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
     }
     // In spark, PreprocessTableInsertion rule has below cast logic.
     // It was missed in carbon when implemented insert into rules.
-    val actualOutput = newLogicalPlan.output
+    if (newLogicalPlan.output.size != expectedOutput.size) {
+      CarbonException.analysisException(
+        s"${carbonTable.getTableName} requires that the data to be inserted " +
+        s"have the same number of columns as the target table: " +
+        s"target table has ${p.table.output.size} column(s) but the " +
+        s"inserted data has ${p.query.output.length + staticParCols.size} column(s), " +
+        s"including ${staticParCols.size} partition column(s) having constant value(s)."
+      )
+    }
     var newChildOutput = newLogicalPlan.output.zip(expectedOutput)
       .map {
         case (actual, expected) =>
@@ -306,30 +317,24 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
             Alias(Cast(actual, expected.dataType), expected.name)(
               explicitMetadata = Option(expected.metadata))
           }
-      } ++ actualOutput.takeRight(actualOutput.size - expectedOutput.size)
-    if (newChildOutput.size >= expectedOutput.size ||
-        carbonDSRelation.carbonTable.isHivePartitionTable) {
-      newChildOutput = newChildOutput.zipWithIndex.map { columnWithIndex =>
-        columnWithIndex._1 match {
-          case attr: Attribute =>
-            Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
-          case attr => attr
-        }
       }
-      val newChild: LogicalPlan = if (newChildOutput == newLogicalPlan.output) {
-        throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
-      } else {
-        Project(newChildOutput, newLogicalPlan)
+    newChildOutput = newChildOutput.zipWithIndex.map { columnWithIndex =>
+      columnWithIndex._1 match {
+        case attr: Attribute =>
+          Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
+        case attr => attr
       }
-
-      val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
-
-      InsertIntoCarbonTable(carbonDSRelation, CarbonToSparkAdapter.getPartitionsFromInsert(p),
-        newChild, overwrite, ifNotExists = true, containsMultipleInserts = containsMultipleInserts)
+    }
+    val newChild: LogicalPlan = if (newChildOutput == newLogicalPlan.output) {
+      throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
     } else {
-      CarbonException.analysisException(
-        "Cannot insert into target table because number of columns mismatch")
+      Project(newChildOutput, newLogicalPlan)
     }
+
+    val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
+
+    InsertIntoCarbonTable(carbonDSRelation, CarbonToSparkAdapter.getPartitionsFromInsert(p),
+      newChild, overwrite, ifNotExists = true, containsMultipleInserts = containsMultipleInserts)
   }
 }
 
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 629ba38..1aa028c 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -472,7 +472,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     val e = intercept[Exception] {
       sql("insert into table1 select * from table2")
     }
-    assert(e.getMessage.contains("number of columns are different"))
+    assert(e.getMessage.contains(
+      "requires that the data to be inserted have the same number of columns as the target table"))
   }
 
   test("test insert into partitioned table with int type to double type") {
@@ -486,6 +487,21 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"DROP TABLE IF EXISTS table1")
   }
 
+  test("test insert into partitioned table with static partition") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql("DROP TABLE IF EXISTS select_from")
+    sql("CREATE TABLE select_from (i int, b string) stored as carbondata")
+    sql("CREATE TABLE table1 (i int) partitioned by (a int, b string) stored as carbondata")
+    sql("insert into table select_from select 1, 'a'")
+    sql("insert into table table1 partition(a='100',b) select 1, b from select_from")
+    checkAnswer(
+      sql("select * from table1"),
+      sql("select 1, 100, 'a'")
+    )
+    sql("DROP TABLE IF EXISTS table1")
+    sql("DROP TABLE IF EXISTS select_from")
+  }
+
   test("test loading data into partitioned table with segment's updateDeltaEndTimestamp not change") {
     val tableName = "test_partitioned_table"
     sql(s"drop table if exists $tableName")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
index 1f6507f..a32a5b5 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala
@@ -81,22 +81,6 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with
     )
   }
 
-  test("Inserting and selecting table: create one column boolean table and insert two columns") {
-    // send to old flow, as for one column two values are inserted.
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
-    sql("insert into boolean_one_column values(true,false)")
-    sql("insert into boolean_one_column values(True)")
-    sql("insert into boolean_one_column values(false,true)")
-    checkAnswer(
-      sql("select * from boolean_one_column"),
-      Seq(Row(true), Row(true), Row(false))
-    )
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
-        CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT)
-  }
-
   test("Inserting and selecting table: two columns boolean and many rows, should support") {
     sql("CREATE TABLE if not exists boolean_table2(" +
         "col1 BOOLEAN, col2 BOOLEAN) STORED AS carbondata")
@@ -441,7 +425,7 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with
       sql("insert into boolean_table2 select * from boolean_table")
     }
     assert(exception_insert.getMessage.contains(
-      "Cannot insert into target table because number of columns mismatch"))
+      "requires that the data to be inserted have the same number of columns as the target table"))
   }
 
   test("Inserting into Hive table from carbon table: support boolean data type and other format") {
@@ -643,26 +627,6 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with
     )
   }
 
-  test("Inserting overwrite: create one column boolean table and insert two columns") {
-    // send to old flow, as for one column two values are inserted.
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
-    sql("insert overwrite table boolean_one_column values(true,false)")
-    checkAnswer(
-      sql("select * from boolean_one_column"),
-      Seq(Row(true))
-    )
-    sql("insert overwrite table boolean_one_column values(True)")
-    sql("insert overwrite table boolean_one_column values(false,true)")
-    checkAnswer(
-      sql("select * from boolean_one_column"),
-      Seq(Row(false))
-    )
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
-        CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT)
-  }
-
   test("Inserting overwrite: two columns boolean and many rows, should support") {
     sql("CREATE TABLE if not exists boolean_table2(" +
         "col1 BOOLEAN, col2 BOOLEAN) STORED AS carbondata")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index b6d530c..1419fbc 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -383,7 +383,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql(
       """insert overwrite table deleteinpartition
         | partition (dtm=20200908)
-        | select * from deleteinpartition
+        | select id, sales from deleteinpartition
         | where dtm = 20200907""".stripMargin)
     checkAnswer(
       sql("""select count(1), dtm from deleteinpartition group by dtm"""),
@@ -395,7 +395,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql(
       """insert overwrite table deleteinpartition
         | partition (dtm=20200909)
-        | select * from deleteinpartition
+        | select id, sales from deleteinpartition
         | where dtm = 20200907""".stripMargin)
     checkAnswer(
       sql("""select count(1), dtm from deleteinpartition group by dtm"""),
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index fdd50e7..75c7ad7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -127,7 +127,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql(
       """insert overwrite table iud.updateinpartition
         | partition (dtm=20200908)
-        | select * from iud.updateinpartition where dtm = 20200907""".stripMargin)
+        | select id, sales from iud.updateinpartition where dtm = 20200907""".stripMargin)
     checkAnswer(
       sql(
         """select sales from iud.updateinpartition
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 7103039..7b7749d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -772,7 +772,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     sql(
       """insert overwrite table target
         | partition (value=3)
-        | select * from target where value = 100""".stripMargin)
+        | select key from target where value = 100""".stripMargin)
     checkAnswer(sql("select * from target order by key"),
       Seq(Row("c", "200"), Row("e", "100"), Row("e", "3")))
     sql("""alter table target drop partition (value=3)""")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
index 58075a6..0148d02 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -172,13 +172,13 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
     sql(
       """insert overwrite table droppartition
         | partition (dtm=20200908)
-        | select * from droppartition
+        | select id, sales from droppartition
         | where dtm = 20200907""".stripMargin)
     // insert overwrite an non-existing partition
     sql(
       """insert overwrite table droppartition
         | partition (dtm=20200909)
-        | select * from droppartition
+        | select id, sales from droppartition
         | where dtm = 20200907""".stripMargin)
 
     // make sure drop one partition won't effect other partitions