You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by GitBox <gi...@apache.org> on 2022/01/05 14:41:01 UTC

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4246: [WIP] Fix clean files removing wrong delta files

akashrn5 commented on a change in pull request #4246:
URL: https://github.com/apache/carbondata/pull/4246#discussion_r778866192



##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##########
@@ -733,9 +733,21 @@ public static long cleanUpDeltaFiles(CarbonTable table, boolean isDryRun) throws
       }
       SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
       for (SegmentUpdateDetails block : updateDetails) {
-        totalDeltaFiles.stream().filter(fileName -> fileName.getName().endsWith(block
-                .getDeleteDeltaStartTimestamp() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
-                .collect(Collectors.toList()).forEach(fileName -> totalDeltaFiles.remove(fileName));
+        // Case 1: When deleteDeltaStartTimestamp = deleteDeltaEndTimestamp. in this case only 1
+        // delta file is present and deltaFileStamps is NULL
+        // Case 2: When deleteDeltaStartTimestamp != deleteDeltaEndTimestamp. in thios case more

Review comment:
       ```suggestion
           // Case 2: When deleteDeltaStartTimestamp != deleteDeltaEndTimestamp. in this case more
   ```

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
##########
@@ -359,6 +359,88 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     sql("drop table if exists partition_hc")
   }
 
+  test("test clean files after IUD Horizontal Compaction when" +
+    " CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION > 1") {
+
+    CarbonProperties.getInstance().
+      addProperty(CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION, "3")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql("drop table if exists origintable")
+
+    sql(
+      """
+        | CREATE TABLE origintable
+        | (id Int,
+        | vin String,
+        | logdate Date,
+        | phonenumber Long,
+        | area String,
+        | salary Int) PARTITIONED BY(country String)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val testData = s"$rootPath/integration/spark/src/test/resources/" +
+      s"partition_data_example.csv"
+
+    sql(
+      s"""
+       LOAD DATA LOCAL INPATH '$testData' into table origintable
+       """)
+
+    sql("delete from origintable where salary = 10000").show()
+    sql("delete from origintable where salary = 10001").show()
+    sql("delete from origintable where salary = 10003").show()
+    var preCleanFiles = sql("select * from origintable").count()
+    sql(s"CLEAN FILES FOR TABLE origintable OPTIONS('force'='true')").collect()
+    var postCleanFiles = sql("select * from origintable").count()
+    assert(preCleanFiles == postCleanFiles)
+    sql("delete from origintable where salary = 10005").show()
+
+    // verify if the horizontal compaction happened or not
+    val carbonTable = CarbonEnv.getCarbonTable(None, "origintable")(sqlContext
+      .sparkSession)
+    val partitionPath = carbonTable.getTablePath + "/country=China"
+    val deltaFilesPre = FileFactory.getCarbonFile(partitionPath).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+      }
+    })
+    assert(deltaFilesPre.size == 5)
+    val updateStatusFilesPre = FileFactory.getCarbonFile(CarbonTablePath.getMetadataPath(carbonTable
+      .getTablePath)).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
+      }
+    })
+    assert(updateStatusFilesPre.size == 3)
+
+    preCleanFiles = sql("select * from origintable").count()
+    sql(s"CLEAN FILES FOR TABLE origintable OPTIONS('force'='true')").collect()
+    postCleanFiles = sql("select * from origintable").count()
+    assert(preCleanFiles == postCleanFiles)
+
+    val deltaFilesPost = FileFactory.getCarbonFile(partitionPath).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+      }
+    })
+    assert(deltaFilesPost.size ==  1)
+    val updateStatusFilesPost = FileFactory.getCarbonFile(CarbonTablePath
+      .getMetadataPath(carbonTable.getTablePath)).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
+      }
+    })
+    assert(updateStatusFilesPost.size == 1)
+
+    sql("drop table if exists origintable")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)

Review comment:
       set back to default value, instead of removing

##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##########
@@ -733,9 +733,21 @@ public static long cleanUpDeltaFiles(CarbonTable table, boolean isDryRun) throws
       }
       SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
       for (SegmentUpdateDetails block : updateDetails) {
-        totalDeltaFiles.stream().filter(fileName -> fileName.getName().endsWith(block
-                .getDeleteDeltaStartTimestamp() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
-                .collect(Collectors.toList()).forEach(fileName -> totalDeltaFiles.remove(fileName));
+        // Case 1: When deleteDeltaStartTimestamp = deleteDeltaEndTimestamp. in this case only 1
+        // delta file is present and deltaFileStamps is NULL
+        // Case 2: When deleteDeltaStartTimestamp != deleteDeltaEndTimestamp. in thios case more
+        // than 1 delta files are present, then can blindly read deltaFilesStamps variable
+        if (block.getDeleteDeltaStartTimestamp().equals(block.getDeleteDeltaEndTimestamp())) {
+          totalDeltaFiles.stream().filter(fileName -> fileName.getName().endsWith(block
+                  .getDeleteDeltaStartTimestamp() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
+                  .collect(Collectors.toList()).forEach(totalDeltaFiles::remove);
+        } else {
+          for (String deltaFile: block.getDeltaFileStamps()) {
+            totalDeltaFiles.stream().filter(fileName -> fileName.getName().endsWith(
+                    deltaFile + CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
+                    .collect(Collectors.toList()).forEach(totalDeltaFiles::remove);

Review comment:
       for every `deltaFile`, you are iterating over `totalDeltaFiles` to find and delete file, it will add more time complexity. Instead for all `totalDeltaFiles`, filter the files `block.getDeltaFileStamps()` and then remove in one shot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org