You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/10/07 11:13:15 UTC

[carbondata] branch master updated: [CARBONDATA-4228] [CARBONDATA-4203] Fixed update/delete after alter add segment

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new bca62cd  [CARBONDATA-4228] [CARBONDATA-4203] Fixed update/delete after alter add segment
bca62cd is described below

commit bca62cda806302fe10076c8d2f50e6fad95b147a
Author: nihal0107 <ni...@gmail.com>
AuthorDate: Tue Sep 21 18:14:21 2021 +0530

    [CARBONDATA-4228] [CARBONDATA-4203] Fixed update/delete after alter add segment
    
    Why is this PR needed?
    Deleted records are reappearing or updated records are showing old values in select
    queries. It is because after horizontal compaction delete delta file for the external
    segment is written to the default path which is Fact\part0\segment_x\ while if the
    segment is an external segment then delete delta file should be written to the path
    where the segment is present.
    
    What changes were proposed in this PR?
    After delete/update operation on the segment, horizontal compaction will be triggered.
    Now after horizontal compaction for external segments, the delete delta file will be
    written to the segment path at the place of the default path.
    
    This closes #4220
---
 .../statusmanager/SegmentUpdateStatusManager.java  | 17 +++++++++--
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 33 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 31e253b..fe40494 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -366,8 +366,18 @@ public class SegmentUpdateStatusManager {
    */
   public List<String> getDeleteDeltaFilesList(final Segment segment, final String blockName) {
     List<String> deleteDeltaFileList = new ArrayList<>();
-    String segmentPath = CarbonTablePath.getSegmentPath(
-        identifier.getTablePath(), segment.getSegmentNo());
+    String segmentPath = null;
+    if (segment.isExternalSegment()) {
+      for (LoadMetadataDetails details : segmentDetails) {
+        if (details.getLoadName().equals(segment.getSegmentNo())) {
+          segmentPath = details.getPath();
+          break;
+        }
+      }
+    } else {
+      segmentPath = CarbonTablePath.getSegmentPath(
+              identifier.getTablePath(), segment.getSegmentNo());
+    }
 
     for (SegmentUpdateDetails block : updateDetails) {
       if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
@@ -375,8 +385,9 @@ public class SegmentUpdateStatusManager {
           !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
         Set<String> deltaFileTimestamps = block.getDeltaFileStamps();
         if (deltaFileTimestamps != null && deltaFileTimestamps.size() > 0) {
+          String finalSegmentPath = segmentPath;
           deltaFileTimestamps.forEach(timestamp -> deleteDeltaFileList.add(
-              CarbonUpdateUtil.getDeleteDeltaFilePath(segmentPath, blockName, timestamp)));
+              CarbonUpdateUtil.getDeleteDeltaFilePath(finalSegmentPath, blockName, timestamp)));
         } else {
           // when the deltaFileTimestamps is null, then there is only one delta file
           // and the SegmentUpdateDetails will have same start and end timestamp,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 72b8134..5d2134f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -86,6 +86,39 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
+  test("test update/delete operation on added segment which required horizontal compaction") {
+    sql("drop table if exists uniqdata")
+    sql("""CREATE TABLE  uniqdata(empname String, designation String, doj Timestamp,
+          |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+          |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+          |  utilization int,salary int, empno int)
+          | STORED AS carbondata""".stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE uniqdata
+           | OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+
+    val table = CarbonEnv.getCarbonTable(None, "uniqdata")(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
+    val newPath = storeLocation + "/" + "addsegtest"
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    CarbonTestUtil.copy(path, newPath)
+
+    sql(s"Alter table uniqdata add segment options ('path'='$newPath','format'='carbon')")
+    // perform update/delete operation after add of new segment
+    sql("delete from uniqdata where empno=11")
+    sql("update uniqdata set (empname)=('nihal') where empno=12")
+
+    checkAnswer(sql("select empname from uniqdata where empno=12"), Seq(Row("nihal"), Row("nihal")))
+    checkAnswer(sql("select count(*) from uniqdata where empno=11"), Seq(Row(0)))
+
+    sql("set carbon.input.segments.default.uniqdata=1")
+    // after update new data will be present in new segment
+    checkAnswer(sql("select count(*) from uniqdata where empno=12"), Seq(Row(0)))
+    checkAnswer(sql("select count(*) from uniqdata where empno=11"), Seq(Row(0)))
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    sql("drop table if exists uniqdata")
+  }
+
   test("test add segment with SI when parent and SI segments are not in sunc") {
     CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")