You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/10/08 07:01:47 UTC

[carbondata] branch master updated: [CARBONDATA-4019]Fix CDC merge failure join expression made of AND/OR expressions.

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d196fbd  [CARBONDATA-4019]Fix CDC merge failure join expression made of AND/OR expressions.
d196fbd is described below

commit d196fbdbadb376054afdcbf13181e78c5e599763
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Mon Sep 28 20:28:49 2020 +0530

    [CARBONDATA-4019]Fix CDC merge failure join expression made of AND/OR expressions.
    
    Why is this PR needed?
    1. In CDC ,when the join expression contains AND/OR expression, then it failed with CAST exception
    2. when multiple columns are present in join condition, then not all columns are considered to check as
    bucket columns to repartition.
    What changes were proposed in this PR?
    1. Instead of directly casting to equalTo expression to get join column, collect all attributes
    2. check if all join columns present in bucket columns and then only repartition the data.
    
    This closes #3961
---
 .../mutation/merge/CarbonMergeDataSetCommand.scala | 24 +++++++++----
 .../spark/testsuite/merge/MergeTestCase.scala      | 40 ++++++++++++++++++++--
 2 files changed, 54 insertions(+), 10 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index 2afdb1a..ec55ea8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -106,18 +106,28 @@ case class CarbonMergeDataSetCommand(
     // decide join type based on match conditions
     val joinType = decideJoinType
 
-    val joinColumn = mergeMatches.joinExpr.expr.asInstanceOf[EqualTo].left
-      .asInstanceOf[UnresolvedAttribute].nameParts.tail.head
-    // repartition the srsDs, if the target has bucketing and the bucketing column and join column
-    // are same
+    val joinColumns = mergeMatches.joinExpr.expr.collect {
+      case unresolvedAttribute: UnresolvedAttribute if unresolvedAttribute.nameParts.nonEmpty =>
+        // Let's say the join condition will be something like A.id = B.id, then it will be an
+        // EqualTo expression, with left expression as UnresolvedAttribute(A.id) and right will
+        // be a Literal(B.id). Since we need the column name here, we can directly check the left
+        // which is UnresolvedAttribute. We take nameparts from UnresolvedAttribute which is an
+        // ArrayBuffer containing "A" and "id", since "id" is column name, we take
+        // nameparts.tail.head which gives us "id" column name.
+        unresolvedAttribute.nameParts.tail.head
+    }.distinct
+
+    // repartition the srsDs, if the target has bucketing and the bucketing columns contains join
+    // columns
     val repartitionedSrcDs =
       if (carbonTable.getBucketingInfo != null &&
           carbonTable.getBucketingInfo
             .getListOfColumns
             .asScala
-            .exists(_.getColumnName.equalsIgnoreCase(joinColumn))) {
-      srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges, srcDS.col(joinColumn))
-    } else {
+            .map(_.getColumnName).containsSlice(joinColumns)) {
+        srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges,
+          joinColumns.map(srcDS.col): _*)
+      } else {
       srcDS
     }
     // Add the getTupleId() udf to get the tuple id to generate delete delta.
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 56b98a3..829c815 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -91,8 +91,11 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     (dwSelframe, odsframe)
   }
 
-  private def initializeWithBucketing = {
-    sql("create table order(id string, name string, c_name string, quantity int, price int, state int) stored as carbondata tblproperties('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='id')")
+  private def initializeWithBucketing(bucketingColumns: Seq[String]) = {
+    sql(s"create table order(id string, name string, c_name string, quantity int, price int, " +
+        s"state int) stored as carbondata tblproperties('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='${
+      bucketingColumns.mkString(",")
+    }')")
     initialize
   }
 
@@ -844,7 +847,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test merge update and insert with condition and expression and delete action with target table as bucketing") {
     sql("drop table if exists order")
-    val (dwSelframe, odsframe) = initializeWithBucketing
+    val (dwSelframe, odsframe) = initializeWithBucketing(Seq("id"))
 
     var matches = Seq.empty[MergeMatch]
     val updateMap = Map(col("id") -> col("A.id"),
@@ -872,6 +875,37 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select price from order where id = 'newid1'"), Seq(Row(7500)))
   }
 
+  test("test merge with target table as multiple bucketing columns and join columns") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initializeWithBucketing(Seq("id", "quantity"))
+
+    var matches = Seq.empty[MergeMatch]
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("price") -> expr("B.price + 1"),
+      col("state") -> col("B.state"))
+
+    val insertMap = Map(col("id") -> col("B.id"),
+      col("name") -> col("B.name"),
+      col("c_name") -> col("B.c_name"),
+      col("quantity") -> col("B.quantity"),
+      col("price") -> expr("B.price * 100"),
+      col("state") -> col("B.state"))
+
+    matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)))
+    matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+    matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+    CarbonMergeDataSetCommand(dwSelframe,
+      odsframe,
+      MergeDataSetMatches((col("A.id").equalTo(col("B.id"))).and(col("A.quantity").equalTo(col(
+        "B.quantity"))), matches.toList)).run(sqlContext.sparkSession)
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
+    checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2)))
+    checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+    checkAnswer(sql("select price from order where id = 'newid1'"), Seq(Row(7500)))
+  }
+
   case class Target (id: Int, value: String, remark: String, mdt: String)
   case class Change (id: Int, value: String, change_type: String, mdt: String)
   private val numInitialRows = 10