You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/06/17 08:49:22 UTC

[carbondata] branch master updated: [CARBONDATA-3820] Fix CDC failure when sort columns present in source dataframe

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b8bdcd  [CARBONDATA-3820] Fix CDC failure when sort columns present in source dataframe
5b8bdcd is described below

commit 5b8bdcd44318f938a0588e34417d2afcb76f676e
Author: haomarch <ma...@126.com>
AuthorDate: Tue May 12 17:39:20 2020 +0800

    [CARBONDATA-3820] Fix CDC failure when sort columns present in source dataframe
    
    Why is this PR needed?
    While merging into table with sortcolumns in the CDC Flow. The following exception will be throwed:
    "column: id specified in sort columns does not exist in schema".
    Root cause is that we use TBLProperteis with sortcolumns to create the TUPLEID_statusOnMerge carbonwriter, in which the sortcolumn、sortscope are all useless.
    
    What changes were proposed in this PR?
    remove the sortcolumn property when creating TUPLEID_statusOnMerge carbonwriter.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3764
---
 .../apache/carbondata/examples/CDCExample.scala    |   2 +
 .../mutation/merge/CarbonMergeDataSetCommand.scala |   4 +-
 .../spark/testsuite/merge/MergeTestCase.scala      | 111 +++++++++++++++++++--
 3 files changed, 109 insertions(+), 8 deletions(-)

diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
index cea17c4..304bf93 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
@@ -141,6 +141,8 @@ object CDCExample {
         .write
         .format("carbondata")
         .option("tableName", "target")
+        .option("sort_scope", "global_sort")
+        .option("sort_column", "id")
         .mode(SaveMode.Overwrite)
         .save()
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index 4e6956b..4cee705 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -186,7 +186,7 @@ case class CarbonMergeDataSetCommand(
     CarbonInsertIntoWithDf(
       databaseNameOp = Some(carbonTable.getDatabaseName),
       tableName = carbonTable.getTableName,
-      options = Map(("fileheader" -> header)),
+      options = Map("fileheader" -> header, "sort_scope" -> "nosort"),
       isOverwriteTable = false,
       dataFrame = loadDF.select(tableCols.map(col): _*),
       updateModel = updateTableModel,
@@ -267,7 +267,7 @@ case class CarbonMergeDataSetCommand(
         StructField(status_on_mergeds, IntegerType)))
     val factory =
       new SparkCarbonFileFormat().prepareWrite(sparkSession, job,
-        carbonTable.getTableInfo.getFactTable.getTableProperties.asScala.toMap, schema)
+        Map(), schema)
     val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration)
     (frame.rdd.coalesce(DistributionUtil.getConfiguredExecutors(sparkSession.sparkContext)).
       mapPartitionsWithIndex { case (index, iter) =>
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index c19a132..9246226 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -83,6 +83,57 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     (dwSelframe, odsframe)
   }
 
+  private def initializeGloabalSort = {
+    val initframe = generateData(10)
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "order")
+      .option("sort_scope", "global_sort")
+      .option("sort_columns", "id")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val dwframe = sqlContext.read.format("carbondata").option("tableName", "order").load()
+    val dwSelframe = dwframe.as("A")
+
+    val odsframe = generateFullCDC(10, 2, 2, 1, 2).as("B")
+    (dwSelframe, odsframe)
+  }
+
+  private def initializeLocalSort = {
+    val initframe = generateData(10)
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "order")
+      .option("sort_scope", "local_sort")
+      .option("sort_columns", "id")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val dwframe = sqlContext.read.format("carbondata").option("tableName", "order").load()
+    val dwSelframe = dwframe.as("A")
+
+    val odsframe = generateFullCDC(10, 2, 2, 1, 2).as("B")
+    (dwSelframe, odsframe)
+  }
+
+  private def initializeNoSortWithSortColumns = {
+    val initframe = generateData(10)
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "order")
+      .option("sort_scope", "no_sort")
+      .option("sort_columns", "id")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val dwframe = sqlContext.read.format("carbondata").option("tableName", "order").load()
+    val dwSelframe = dwframe.as("A")
+
+    val odsframe = generateFullCDC(10, 2, 2, 1, 2).as("B")
+    (dwSelframe, odsframe)
+  }
+
   private def initializePartition = {
     val initframe = generateData(10)
     initframe.write
@@ -142,6 +193,54 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
   }
 
+  test("test basic merge into the globalsort table") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initializeGloabalSort
+
+    val updateMap = Map("id" -> "A.id",
+      "name" -> "B.name",
+      "c_name" -> "B.c_name",
+      "quantity" -> "B.quantity",
+      "price" -> "B.price",
+      "state" -> "B.state").asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
+      col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test basic merge into the localsort table") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initializeLocalSort
+
+    val updateMap = Map("id" -> "A.id",
+      "name" -> "B.name",
+      "c_name" -> "B.c_name",
+      "quantity" -> "B.quantity",
+      "price" -> "B.price",
+      "state" -> "B.state").asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
+      col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test basic merge into the nosort table with sortcolumns") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initializeNoSortWithSortColumns
+
+    val updateMap = Map("id" -> "A.id",
+      "name" -> "B.name",
+      "c_name" -> "B.c_name",
+      "quantity" -> "B.quantity",
+      "price" -> "B.price",
+      "state" -> "B.state").asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
+      col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
   test("test basic merge update with few mappings with out condition") {
     sql("drop table if exists order")
     val (dwSelframe, odsframe) = initialize
@@ -459,12 +558,12 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists customers")
 
     val initframe =
-    sqlContext.sparkSession.createDataFrame(Seq(
-      Row(1, "old address for 1", false, null, Date.valueOf("2018-02-01")),
-      Row(1, "current address for 1", true, Date.valueOf("2018-02-01"), null),
-      Row(2, "current address for 2", true, Date.valueOf("2018-02-01"), null),
-      Row(3, "current address for 3", true, Date.valueOf("2018-02-01"), null)
-    ).asJava, StructType(Seq(StructField("customerId", IntegerType), StructField("address", StringType), StructField("current", BooleanType), StructField("effectiveDate", DateType), StructField("endDate", DateType))))
+      sqlContext.sparkSession.createDataFrame(Seq(
+        Row(1, "old address for 1", false, null, Date.valueOf("2018-02-01")),
+        Row(1, "current address for 1", true, Date.valueOf("2018-02-01"), null),
+        Row(2, "current address for 2", true, Date.valueOf("2018-02-01"), null),
+        Row(3, "current address for 3", true, Date.valueOf("2018-02-01"), null)
+      ).asJava, StructType(Seq(StructField("customerId", IntegerType), StructField("address", StringType), StructField("current", BooleanType), StructField("effectiveDate", DateType), StructField("endDate", DateType))))
     initframe.printSchema()
     initframe.write
       .format("carbondata")