You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2022/01/13 05:26:43 UTC

[carbondata] branch master updated: [CARBONDATA-4320] Fix clean files removing wrong delta files

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 05aff87  [CARBONDATA-4320] Fix clean files removing wrong delta files
05aff87 is described below

commit 05aff876d4e7ae7dcea2cecda176b470eb658ff8
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Tue Jan 4 16:38:12 2022 +0530

    [CARBONDATA-4320] Fix clean files removing wrong delta files
    
    Why is this PR needed?
    In the case where there are multiple delete delta files in a partition
    in a partition table, some delta files were being ignored and deleted,
    thus changing the value during the query
    
    What changes were proposed in this PR?
    Fixed the logic which checks which delta file to delete. Now checking
    the deltaStartTime and comparing it with deltaEndTime to check consider
    all the delta files during clean files.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes, one test case has been added.
    
    This closes #4246
---
 .../carbondata/core/mutate/CarbonUpdateUtil.java   | 20 ++++--
 .../TestCleanFilesCommandPartitionTable.scala      | 83 ++++++++++++++++++++++
 2 files changed, 98 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index da8d329..f0f23e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -732,11 +732,21 @@ public class CarbonUpdateUtil {
                 .collect(Collectors.toList()));
       }
       SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
-      for (SegmentUpdateDetails block : updateDetails) {
-        totalDeltaFiles.stream().filter(fileName -> fileName.getName().endsWith(block
-                .getDeleteDeltaStartTimestamp() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
-                .collect(Collectors.toList()).forEach(fileName -> totalDeltaFiles.remove(fileName));
-      }
+
+      // Case 1: When deleteDeltaStartTimestamp = deleteDeltaEndTimestamp. in this case only 1
+      // delta file is present and deltaFileStamps is NULL
+      // Case 2: When deleteDeltaStartTimestamp != deleteDeltaEndTimestamp. in this case more
+      // than 1 delta files are present, then can blindly read deltaFilesStamps variable
+      Arrays.stream(updateDetails).forEach(block -> {
+        if (block.getDeleteDeltaStartTimestamp().equals(block.getDeleteDeltaEndTimestamp())) {
+          totalDeltaFiles.removeIf(deltaFile -> deltaFile.getName().endsWith(block
+              .getDeleteDeltaEndTimestamp() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
+        } else {
+          block.getDeltaFileStamps().stream().forEach(fileName -> totalDeltaFiles
+              .removeIf(deltaFile -> deltaFile.getName().endsWith(fileName +
+              CarbonCommonConstants.DELETE_DELTA_FILE_EXT)));
+        }
+      });
       for (CarbonFile invalidFile: totalDeltaFiles) {
         totalSizeDeleted += invalidFile.getSize();
         filesToBeDeleted.add(invalidFile);
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
index 26f3ccf..14d6da7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
@@ -359,6 +359,89 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     sql("drop table if exists partition_hc")
   }
 
+  test("test clean files after IUD Horizontal Compaction when" +
+    " CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION > 1") {
+
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION, "3")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql("drop table if exists origintable")
+
+    sql(
+      """
+        | CREATE TABLE origintable
+        | (id Int,
+        | vin String,
+        | logdate Date,
+        | phonenumber Long,
+        | area String,
+        | salary Int) PARTITIONED BY(country String)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val testData = s"$rootPath/integration/spark/src/test/resources/" +
+      s"partition_data_example.csv"
+
+    sql(
+      s"""
+       LOAD DATA LOCAL INPATH '$testData' into table origintable
+       """)
+
+    sql("delete from origintable where salary = 10000").show()
+    sql("delete from origintable where salary = 10001").show()
+    sql("delete from origintable where salary = 10003").show()
+    var preCleanFiles = sql("select * from origintable").count()
+    sql(s"CLEAN FILES FOR TABLE origintable OPTIONS('force'='true')").collect()
+    var postCleanFiles = sql("select * from origintable").count()
+    assert(preCleanFiles == postCleanFiles)
+    sql("delete from origintable where salary = 10005").show()
+
+    // verify if the horizontal compaction happened or not
+    val carbonTable = CarbonEnv.getCarbonTable(None, "origintable")(sqlContext
+      .sparkSession)
+    val partitionPath = carbonTable.getTablePath + "/country=China"
+    val deltaFilesPre = FileFactory.getCarbonFile(partitionPath).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+      }
+    })
+    assert(deltaFilesPre.size == 5)
+    val updateStatusFilesPre = FileFactory.getCarbonFile(CarbonTablePath.getMetadataPath(carbonTable
+      .getTablePath)).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
+      }
+    })
+    assert(updateStatusFilesPre.size == 3)
+
+    preCleanFiles = sql("select * from origintable").count()
+    sql(s"CLEAN FILES FOR TABLE origintable OPTIONS('force'='true')").collect()
+    postCleanFiles = sql("select * from origintable").count()
+    assert(preCleanFiles == postCleanFiles)
+
+    val deltaFilesPost = FileFactory.getCarbonFile(partitionPath).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+      }
+    })
+    assert(deltaFilesPost.size ==  1)
+    val updateStatusFilesPost = FileFactory.getCarbonFile(CarbonTablePath
+      .getMetadataPath(carbonTable.getTablePath)).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
+      }
+    })
+    assert(updateStatusFilesPost.size == 1)
+
+    sql("drop table if exists origintable")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants
+      .DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION,
+        CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)
+  }
+
   def editTableStatusFile(carbonTablePath: String) : Unit = {
     // Original Table status file
     val f1 = new File(CarbonTablePath.getTableStatusFilePath(carbonTablePath))