You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:04:31 UTC

[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #4151: [CARBONDATA-4211] Fix - from xx Insert into select fails if an SQL statement contains multiple inserts

ShreelekhyaG commented on a change in pull request #4151:
URL: https://github.com/apache/carbondata/pull/4151#discussion_r654251072



##########
File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
   }
 
+  test("test load with multiple inserts") {
+    sql("drop table if exists catalog_returns_5")
+    sql("drop table if exists catalog_returns_6")
+    sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," +
+        "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'")
+    sql("insert into catalog_returns_5 values(1,2,3)")
+    sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" +
+        " (cr_returned_date_sk int) stored as carbondata")
+    sql(
+      "from catalog_returns_5 insert overwrite table catalog_returns_6 partition " +
+      "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " +
+      "cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " +
+      "catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " +
+      "cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk")

Review comment:
       Done

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
   }
 
+  test("test load with multiple inserts") {
+    sql("drop table if exists catalog_returns_5")
+    sql("drop table if exists catalog_returns_6")
+    sql("create table catalog_returns_5(cr_returned_date_sk int,cr_returned_time_sk int," +
+        "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\\n'")
+    sql("insert into catalog_returns_5 values(1,2,3)")
+    sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk int) partitioned by" +
+        " (cr_returned_date_sk int) stored as carbondata")
+    sql(
+      "from catalog_returns_5 insert overwrite table catalog_returns_6 partition " +
+      "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, cr_returned_date_sk where " +
+      "cr_returned_date_sk is not null distribute by cr_returned_date_sk insert overwrite table " +
+      "catalog_returns_6 partition (cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, " +
+      "cr_returned_date_sk where cr_returned_date_sk is null distribute by cr_returned_date_sk")
+    checkAnswer(sql("select *from catalog_returns_6"), Seq(Row(2, 3, 1)))

Review comment:
       Done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -365,3 +374,33 @@ object DMLStrategy extends SparkStrategy {
   }
 }
 
+case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode {
+
+  override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
+
+  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
+    val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+    val internalRow = cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
+    val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+    // To make GenericInternalRow to UnsafeRow
+    val row = unsafeProjection(internalRow.head)
+    Seq(row)
+  }
+
+  override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
+
+  override def output: Seq[Attribute] = cmd.output
+
+  override def nodeName: String = "Execute " + cmd.nodeName
+
+  override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
+
+  override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator
+
+  override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+  }

Review comment:
       ok, removed

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -57,7 +59,14 @@ object DMLStrategy extends SparkStrategy {
       case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
         ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
       case insert: InsertIntoCarbonTable =>
-        ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+        if (insert.isMultipleInserts) {
+          // when multiple insert statements are given with single plan (Union),
+          // Spark expects row to be of UnsafeRow. Here use UnionCommandExec to
+          // implement custom sideEffectResult and return row as UnsafeRow
+          UnionCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil

Review comment:
       Done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -365,3 +374,33 @@ object DMLStrategy extends SparkStrategy {
   }
 }
 
+case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode {

Review comment:
       Done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -57,7 +58,16 @@ object DMLStrategy extends SparkStrategy {
       case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
         ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
       case insert: InsertIntoCarbonTable =>
-        ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+        if (insert.isMultipleInserts) {
+          // Successful insert in carbon will return segment ID as row.

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
##########
@@ -250,13 +251,19 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
 
       case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
         if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        castChildOutput(p, relation, child)
+        var isMultipleInserts = false

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org