You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/12/22 13:21:23 UTC

[carbondata] branch master updated: [CARBONDATA-4316]Fix horizontal compaction failure for partition tables

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d629dc0  [CARBONDATA-4316]Fix horizontal compaction failure for partition tables
d629dc0 is described below

commit d629dc0b894a64bfbef762736775a182e40827fe
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Tue Dec 7 18:55:13 2021 +0530

    [CARBONDATA-4316]Fix horizontal compaction failure for partition tables
    
    Why is this PR needed?
    Horizontal compaction fails for partition table leading to many delete
    delta files for a single block, leading to slower query performance.
    This is happening because during horizontal compaction the delta file
    path prepared for the partition table is wrong which fails to identify
    the path and fails the operation.
    
    What changes were proposed in this PR?
    If it is a partition table, read the segment file and identity the
    partition where the block is present to prepare a proper partition path.
    
    This closes #4240
---
 .../statusmanager/SegmentUpdateStatusManager.java  | 24 ++++++++++++++-
 .../mutation/CarbonProjectForDeleteCommand.scala   |  9 ++++--
 .../command/mutation/HorizontalCompaction.scala    |  4 +--
 .../testsuite/iud/DeleteCarbonTableTestCase.scala  | 34 ++++++++++++++++++++++
 .../processing/merger/CarbonDataMergerUtil.java    |  9 +++---
 5 files changed, 69 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index fe40494..8572ae5 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.statusmanager;
 
 import java.io.*;
 import java.util.*;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -33,6 +34,7 @@ import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
@@ -364,7 +366,8 @@ public class SegmentUpdateStatusManager {
    * @param blockName the specified block of the segment
    * @return delete delta file list of the block
    */
-  public List<String> getDeleteDeltaFilesList(final Segment segment, final String blockName) {
+  public List<String> getDeleteDeltaFilesList(final Segment segment, final String blockName)
+      throws IOException {
     List<String> deleteDeltaFileList = new ArrayList<>();
     String segmentPath = null;
     if (segment.isExternalSegment()) {
@@ -374,6 +377,25 @@ public class SegmentUpdateStatusManager {
           break;
         }
       }
+    } else if (isPartitionTable) {
+      String segmentFileName = Arrays.stream(segmentDetails).filter(
+          loadMetaDataDetail -> loadMetaDataDetail.getLoadName()
+              .equalsIgnoreCase(segment.getSegmentNo())).collect(Collectors.toList()).get(0)
+          .getSegmentFile();
+      SegmentFileStore segmentFileStore =
+          new SegmentFileStore(identifier.getTablePath(), segmentFileName);
+      segmentFileStore.readIndexFiles(SegmentStatus.SUCCESS, false, FileFactory.getConfiguration());
+      for (Map.Entry<String, List<String>> entry : segmentFileStore.getIndexFilesMap().entrySet()) {
+        List<String> matchedBlocksInPartition = entry.getValue().stream().filter(blockFile -> {
+          String blockFileName = blockFile.substring(blockFile.lastIndexOf(File.separator) + 1);
+          return blockName.equalsIgnoreCase(CarbonUpdateUtil.getBlockName(blockFileName));
+        }).collect(Collectors.toList());
+        if (matchedBlocksInPartition.size() > 0) {
+          segmentPath = matchedBlocksInPartition.get(0)
+              .substring(0, matchedBlocksInPartition.get(0).lastIndexOf(File.separator));
+          break;
+        }
+      }
     } else {
       segmentPath = CarbonTablePath.getSegmentPath(
               identifier.getTablePath(), segment.getSegmentNo());
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 87b1e41..7d54186 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -94,6 +94,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
         LockUsage.UPDATE_LOCK)
     var lockStatus = false
     var hasException = false
+    var deletedRows = 0L;
     try {
       lockStatus = metadataLock.lockWithRetries()
       if (lockStatus) {
@@ -118,6 +119,8 @@ private[sql] case class CarbonProjectForDeleteCommand(
         isUpdateOperation = false,
         executorErrors)
 
+      deletedRows = deletedRowCount;
+
       // Check for any failures occurred during delete delta execution
       if (executorErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executorErrors.errorMsg)
@@ -141,14 +144,16 @@ private[sql] case class CarbonProjectForDeleteCommand(
       val deleteFromTablePostEvent: DeleteFromTablePostEvent =
         DeleteFromTablePostEvent(sparkSession, carbonTable)
       OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext)
-      Seq(Row(deletedRowCount))
+      Seq(Row(deletedRows))
     } catch {
       case e: HorizontalCompactionException =>
         LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." +
                      " Please check logs. " + e.getMessage)
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
         hasException = true
-        Seq(Row(0L))
+        // if just the horizontal compaction fails, return the deleted count as it will be
+        // successful.
+        Seq(Row(deletedRows))
 
       case e: Exception =>
         LOGGER.error("Exception in Delete data operation " + e.getMessage, e)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 6ff3107..9ed2d10 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -112,9 +112,7 @@ object HorizontalCompaction {
     val db = carbonTable.getDatabaseName
     val table = carbonTable.getTableName
     val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
-      absTableIdentifier,
-      segmentUpdateStatusManager,
-      compactionTypeIUD)
+      segmentUpdateStatusManager)
     if (LOG.isDebugEnabled) {
       LOG.debug(s"The segment list for Horizontal Update Compaction is $deletedBlocksList")
     }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 1419fbc..4764083 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -468,6 +468,40 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"drop table if exists ${ tableName }").collect()
   }
 
+  test("test partition table delete and horizontal compaction") {
+    sql("drop table if exists iud_db.partition_hc")
+    sql(
+      "create table iud_db.partition_hc (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) " +
+      "STORED AS carbondata")
+    sql(
+      "insert into iud_db.partition_hc values ('a',1,'aaa','aa'),('a',5,'aaa','aa'),('a',9,'aaa'," +
+      "'aa'),('a',4,'aaa','aa'),('a',2,'aaa','aa'),('a',3,'aaa'," +
+      "'aa')")
+    sql("delete from iud_db.partition_hc where c2 = 1").show()
+    sql("delete from iud_db.partition_hc where c2 = 5").show()
+    checkAnswer(
+      sql("""select c2 from iud_db.partition_hc"""),
+      Seq(Row(9), Row(4), Row(2), Row(3))
+    )
+    // verify if the horizontal compaction happened or not
+    val carbonTable = CarbonEnv.getCarbonTable(Some("iud_db"), "partition_hc")(sqlContext
+      .sparkSession)
+    val partitionPath = carbonTable.getTablePath + "/c3=aa"
+    val deltaFiles = FileFactory.getCarbonFile(partitionPath).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+      }
+    })
+    assert(deltaFiles.size == 3)
+    val updateStatusFiles = FileFactory.getCarbonFile(CarbonTablePath.getMetadataPath(carbonTable
+      .getTablePath)).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
+      }
+    })
+    assert(updateStatusFiles.size == 3)
+  }
+
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud_db cascade")
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 955469d..3fe78e6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -875,13 +875,11 @@ public final class CarbonDataMergerUtil {
   /**
    * method gets the segments list which get qualified for IUD compaction.
    * @param segments
-   * @param absoluteTableIdentifier
-   * @param compactionTypeIUD
    * @return
    */
   public static List<String> getSegListIUDCompactionQualified(List<Segment> segments,
-      AbsoluteTableIdentifier absoluteTableIdentifier,
-      SegmentUpdateStatusManager segmentUpdateStatusManager, CompactionType compactionTypeIUD) {
+      SegmentUpdateStatusManager segmentUpdateStatusManager)
+      throws IOException {
 
     List<String> validSegments = new ArrayList<>();
 
@@ -906,7 +904,8 @@ public final class CarbonDataMergerUtil {
    * @return block list of the segment
    */
   private static List<String> checkDeleteDeltaFilesInSeg(Segment seg,
-      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
+      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold)
+      throws IOException {
 
     List<String> blockLists = new ArrayList<>();
     Set<String> uniqueBlocks = new HashSet<String>();