You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/07/21 02:40:15 UTC
[carbondata] branch master updated: [CARBONDATA-3895] Fix
FileNotFound exception in query after global sort compaction
This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 59390d0 [CARBONDATA-3895] Fix FileNotFound exception in query after global sort compaction
59390d0 is described below
commit 59390d00ecb5531e65129b005409e331f87f9ed7
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Sun Jun 28 12:00:57 2020 +0530
[CARBONDATA-3895] Fix FileNotFound exception in query after global sort compaction
Why is this PR needed?
After global sort compaction, if we execute clean files and run the query or update delete operations, we get file not found exceptions and some data lost. This is because we form new load model for global sort compaction and facttimestamp is not set, so carbondata files are generated with a timestamp as 0.
What changes were proposed in this PR?
copy the facttimestamp from the incoming loadmodel and set it into new load model
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3812
---
.../carbondata/spark/rdd/CarbonTableCompactor.scala | 3 +++
.../testsuite/dataload/TestGlobalSortDataLoad.scala | 19 +++++++++++++++++++
2 files changed, 22 insertions(+)
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index af9a5c1..83d8935 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -424,6 +424,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
// generate LoadModel which can be used global_sort flow
val outputModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
sparkSession, table)
+ // set fact time stamp, else the carbondata file will be created with fact timestamp as 0.
+ outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+ outputModel.setLoadMetadataDetails(carbonLoadModel.getLoadMetadataDetails)
outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
loadResult = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
sparkSession,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index acd47e1..6d5af7f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -483,6 +483,25 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
checkAnswer(sql("select * from sink"), Row("k", null, null,null,null, null, null, mutable.WrappedArray.make(Array(null)), Row(null), Map("null" -> "null")))
}
+ test("test global sort compaction, clean files, update delete") {
+ sql("DROP TABLE IF EXISTS carbon_global_sort_update")
+ sql(
+ """
+ | CREATE TABLE carbon_global_sort_update(id INT, name STRING, city STRING, age INT)
+ | STORED AS carbondata TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'sort_columns' = 'name, city')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_global_sort_update")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_global_sort_update")
+ sql("alter table carbon_global_sort_update compact 'major'")
+ sql("clean files for table carbon_global_sort_update")
+ assert(sql("select * from carbon_global_sort_update").count() == 24)
+ val updatedRows = sql("update carbon_global_sort_update d set (id) = (id + 3) where d.name = 'd'").collect()
+ assert(updatedRows.head.get(0) == 2)
+ val deletedRows = sql("delete from carbon_global_sort_update d where d.id = 12").collect()
+ assert(deletedRows.head.get(0) == 2)
+ assert(sql("select * from carbon_global_sort_update").count() == 22)
+ }
+
private def resetConf() {
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)