You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/07/21 02:40:15 UTC

[carbondata] branch master updated: [CARBONDATA-3895] Fix FileNotFound exception in query after global sort compaction

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 59390d0  [CARBONDATA-3895] Fix FileNotFound exception in query after global sort compaction
59390d0 is described below

commit 59390d00ecb5531e65129b005409e331f87f9ed7
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Sun Jun 28 12:00:57 2020 +0530

    [CARBONDATA-3895] Fix FileNotFound exception in query after global sort compaction
    
    Why is this PR needed?
    After global sort compaction, if we execute clean files and run the query or update delete operations, we get file not found exceptions and some data lost. This is because we form new load model for global sort compaction and facttimestamp is not set, so carbondata files are generated with a timestamp as 0.
    
    What changes were proposed in this PR?
    copy the facttimestamp from the incoming loadmodel and set it into new load model
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3812
---
 .../carbondata/spark/rdd/CarbonTableCompactor.scala   |  3 +++
 .../testsuite/dataload/TestGlobalSortDataLoad.scala   | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index af9a5c1..83d8935 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -424,6 +424,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       // generate LoadModel which can be used global_sort flow
       val outputModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
         sparkSession, table)
+      // set fact time stamp, else the carbondata file will be created with fact timestamp as 0.
+      outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+      outputModel.setLoadMetadataDetails(carbonLoadModel.getLoadMetadataDetails)
       outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
       loadResult = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
         sparkSession,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index acd47e1..6d5af7f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -483,6 +483,25 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     checkAnswer(sql("select * from sink"), Row("k", null, null,null,null, null, null, mutable.WrappedArray.make(Array(null)), Row(null), Map("null" -> "null")))
   }
 
+  test("test global sort compaction, clean files, update delete") {
+    sql("DROP TABLE IF EXISTS carbon_global_sort_update")
+    sql(
+      """
+        | CREATE TABLE carbon_global_sort_update(id INT, name STRING, city STRING, age INT)
+        | STORED AS carbondata TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'sort_columns' = 'name, city')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_global_sort_update")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_global_sort_update")
+    sql("alter table carbon_global_sort_update compact 'major'")
+    sql("clean files for table carbon_global_sort_update")
+    assert(sql("select * from carbon_global_sort_update").count() == 24)
+    val updatedRows = sql("update carbon_global_sort_update d set (id) = (id + 3) where d.name = 'd'").collect()
+    assert(updatedRows.head.get(0) == 2)
+    val deletedRows = sql("delete from carbon_global_sort_update d where d.id = 12").collect()
+    assert(deletedRows.head.get(0) == 2)
+    assert(sql("select * from carbon_global_sort_update").count() == 22)
+  }
+  
   private def resetConf() {
     CarbonProperties.getInstance()
       .removeProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)