You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/04/03 06:40:42 UTC

[carbondata] branch master updated: [CARBONDATA-3755]Fix clean up issue with respect to segmentMetadaInfo after update and clean files

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e95231  [CARBONDATA-3755]Fix clean up issue with respect to segmentMetadaInfo after update and clean files
5e95231 is described below

commit 5e952314cd22b21d4ae8b1c962143d3fded45b40
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Fri Mar 27 00:00:23 2020 +0530

    [CARBONDATA-3755]Fix clean up issue with respect to segmentMetadaInfo
    after update and clean files
    
    Why is this PR needed?
    1. segmentMetadaInfo is not getting copied to new segment files written
    after multiple updates and clean files opearation.
    2. old segment files are not getting deleted and getting accumulated.
    
    What changes were proposed in this PR?
    1. update the segmentMetadaInfo to new files
    2. once we write new segment file, delete the old invalid segment files.
    
    This closes #3683
---
 .../carbondata/core/mutate/CarbonUpdateUtil.java   | 27 ++++++++++++++++++----
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  | 11 +++++----
 2 files changed, 30 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 397ada6..a1d1e18 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -569,7 +569,8 @@ public class CarbonUpdateUtil {
             }
           }
           if (updateSegmentFile) {
-            segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName(), null));
+            segmentFilesToBeUpdated.add(
+                new Segment(segment.getLoadName(), segment.getSegmentFile(), null));
           }
         }
         // handle cleanup of merge index files and data files after small files merge happened for
@@ -579,10 +580,28 @@ public class CarbonUpdateUtil {
     }
     String UUID = String.valueOf(System.currentTimeMillis());
     List<Segment> segmentFilesToBeUpdatedLatest = new ArrayList<>();
+    CarbonFile segmentFilesLocation =
+        FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(table.getTablePath()));
     for (Segment segment : segmentFilesToBeUpdated) {
-      String file =
-          SegmentFileStore.writeSegmentFile(table, segment.getSegmentNo(), UUID);
-      segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(), file));
+      SegmentFileStore fileStore =
+          new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName());
+      segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
+      String updatedSegmentFile = SegmentFileStore
+          .writeSegmentFile(table, segment.getSegmentNo(), UUID,
+              CarbonTablePath.getSegmentPath(table.getTablePath(), segment.getSegmentNo()),
+              segment.getSegmentMetaDataInfo());
+      segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(), updatedSegmentFile));
+
+      // delete the old segment files
+      CarbonFile[] invalidSegmentFiles = segmentFilesLocation.listFiles(new CarbonFileFilter() {
+        @Override
+        public boolean accept(CarbonFile file) {
+          return !file.getName().equalsIgnoreCase(updatedSegmentFile);
+        }
+      });
+      for (CarbonFile invalidSegmentFile : invalidSegmentFiles) {
+        invalidSegmentFile.delete();
+      }
     }
     if (segmentFilesToBeUpdated.size() > 0) {
       updateTableMetadataStatus(
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index eb06e33..522924d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -52,7 +52,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
       .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
   }
 
-  test("test update operation with 0 rows updation.") {
+  test("test update operation with 0 rows updation and clean files operation") {
     sql("""drop table if exists iud.zerorows""").show
     sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED AS carbondata""")
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows""")
@@ -62,9 +62,12 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
       sql("""select c1,c2,c3,c5 from iud.zerorows"""),
       Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
     )
-    sql("""drop table iud.zerorows""").show
-
-
+    sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'e'""").show()
+    sql("clean files for table iud.zerorows")
+    val carbonTable = CarbonEnv.getCarbonTable(Some("iud"), "zerorows")(sqlContext.sparkSession)
+    val segmentFileLocation = FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath))
+    assert(segmentFileLocation.listFiles().length == 1)
+    sql("""drop table iud.zerorows""")
   }