You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/09/15 16:31:37 UTC

[carbondata] branch master updated: [CARBONDATA-3986] Fix multiple issues during compaction and concurrent scenarios

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c90f03b  [CARBONDATA-3986] Fix multiple issues during compaction and concurrent scenarios
c90f03b is described below

commit c90f03b3e75e4d0f48e5e8aad5dfedb728b55f9a
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Wed Jul 29 13:50:08 2020 +0530

    [CARBONDATA-3986] Fix multiple issues during compaction and concurrent scenarios
    
    Why is this PR needed?
    Fix multiple issues during compaction and concurrent scenarios
    a) Auto compaction/multiple times minor compaction is called, it was considering
    compacted segments and coming compaction again ad overwriting the files and segments
    b) Minor/ auto compaction should skip >=2 level segments, now only skipping =2 level
    segments
    c) when compaction failed, no need to call merge index
    d) At executor, When segment file or table status file failed to write during merge
    index event, need to remove the stale files.
    e) during partial load cleanup segment folders are removed but segment metadata files
    were not removed
    f) Some table status retry issues
    g) BlockIndex was not checking for empty partition
    
    What changes were proposed in this PR?
    a) Auto compaction/minor compaction consider only SUCCESS or PARTIAL_SUCCESS Segments
    b) Minor/ auto compaction should skip >=2 level segments
    c) when compaction failed, no need to call merge index
    d) At executor, When segment file or table status file failed to write during merge
    index event, Remove the stale files.
    e) During partial load cleanup segment metadata files also need to be removed
    f) add retry to table status
    g) BlockIndex need to check for empty partition
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No [as mostly concurrent scenarios]
    
    This closes #3871
---
 .../apache/carbondata/core/index/TableIndex.java   |   2 +-
 .../core/indexstore/blockletindex/BlockIndex.java  |   2 +-
 .../core/statusmanager/SegmentStatusManager.java   |  12 ++-
 .../core/writer/CarbonIndexFileMergeWriter.java    |  44 +++++++-
 .../load/CarbonInternalLoaderUtil.java             |  12 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala           |  12 ++-
 .../spark/rdd/CarbonTableCompactor.scala           |  15 ++-
 .../secondaryindex/util/SecondaryIndexUtil.scala   |  13 ++-
 .../loading/TableProcessingOperations.java         | 115 ++++++++++++++-------
 .../processing/merger/CarbonDataMergerUtil.java    |  53 ++++++----
 10 files changed, 205 insertions(+), 75 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
index fe5e756..1cb7760 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
@@ -607,7 +607,7 @@ public final class TableIndex extends OperationEventListener {
     for (Segment segment : segments) {
       List<CoarseGrainIndex> indexes = defaultIndex.getIndexFactory().getIndexes(segment);
       for (CoarseGrainIndex index : indexes) {
-        if (null != partitions) {
+        if (null != partitions && !partitions.isEmpty()) {
           // if it has partitioned index but there is no partitioned information stored, it means
           // partitions are dropped so return empty list.
           if (index.validatePartitionInfo(partitions)) {
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
index 31d2ecb..7aef951 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
@@ -613,7 +613,7 @@ public class BlockIndex extends CoarseGrainIndex
       Map<String, Long> blockletToRowCountMap = new HashMap<>();
       // if it has partitioned index but there is no partitioned information stored, it means
       // partitions are dropped so return empty list.
-      if (partitions != null) {
+      if (partitions != null && !partitions.isEmpty()) {
         if (validatePartitionInfo(partitions)) {
           return totalRowCount;
         }
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index f53e06f..970187a 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -291,7 +291,7 @@ public class SegmentStatusManager {
    * @return file content, null is file does not exist
    * @throws IOException if IO errors
    */
-  private static String readFileAsString(String tableStatusPath) throws IOException {
+  public static String readFileAsString(String tableStatusPath) throws IOException {
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
     InputStreamReader inStream = null;
@@ -632,7 +632,7 @@ public class SegmentStatusManager {
    * @param content content to write
    * @throws IOException if IO errors
    */
-  private static void writeStringIntoFile(String filePath, String content) throws IOException {
+  public static void writeStringIntoFile(String filePath, String content) throws IOException {
     AtomicFileOperations fileWrite = AtomicFileOperationFactory.getAtomicFileOperations(filePath);
     BufferedWriter brWriter = null;
     DataOutputStream dataOutputStream = null;
@@ -1065,8 +1065,14 @@ public class SegmentStatusManager {
             CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK);
         boolean locked = false;
         try {
+          int retryCount = CarbonLockUtil
+              .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+                  CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+          int maxTimeout = CarbonLockUtil
+              .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+                  CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
           // Update load metadata file after cleaning deleted nodes
-          locked = carbonTableStatusLock.lockWithRetries();
+          locked = carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout);
           if (locked) {
             LOG.info("Table status lock has been successfully acquired.");
             // Again read status and check to verify update required or not.
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 1fbe4cb..6fa2681 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -221,6 +221,7 @@ public class CarbonIndexFileMergeWriter {
     List<PartitionSpec> partitionSpecs = SegmentFileStore
         .getPartitionSpecs(segmentId, table.getTablePath(), SegmentStatusManager
             .readLoadMetadata(CarbonTablePath.getMetadataPath(table.getTablePath())));
+    List<String> mergeIndexFiles = new ArrayList<>();
     for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) {
       String mergeIndexFile =
           writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), entry.getValue(), segmentId);
@@ -233,15 +234,27 @@ public class CarbonIndexFileMergeWriter {
         }
         if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) {
           segment.getValue().setMergeFileName(mergeIndexFile);
-          segment.getValue().setFiles(new HashSet<String>());
+          mergeIndexFiles
+              .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile);
+          segment.getValue().setFiles(new HashSet<>());
           break;
         }
       }
       if (table.isHivePartitionTable()) {
         for (PartitionSpec partitionSpec : partitionSpecs) {
           if (partitionSpec.getLocation().toString().equals(partitionPath)) {
-            SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
-                segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+            try {
+              SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
+                  segmentId + CarbonCommonConstants.UNDERSCORE + uuid + "",
+                  partitionSpec.getPartitions(), true);
+            } catch (Exception ex) {
+              // delete merge index file if created,
+              // keep only index files as segment file writing is failed
+              FileFactory.getCarbonFile(mergeIndexFile).delete();
+              LOGGER.error(
+                  "unable to write segment file during merge index writing: " + ex.getMessage());
+              throw ex;
+            }
           }
         }
       }
@@ -251,9 +264,30 @@ public class CarbonIndexFileMergeWriter {
     String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
     if (!table.isHivePartitionTable()) {
-      SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
-      SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName,
+      String content = SegmentStatusManager.readFileAsString(path);
+      try {
+        SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
+      } catch (Exception ex) {
+        // delete merge index file if created,
+        // keep only index files as segment file writing is failed
+        for (String mergeIndexFile : mergeIndexFiles) {
+          FileFactory.getCarbonFile(mergeIndexFile).delete();
+        }
+        LOGGER.error("unable to write segment file during merge index writing: " + ex.getMessage());
+        throw ex;
+      }
+      boolean status = SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName,
           table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
+      if (!status) {
+        // revert to original segment file as the table status update has failed.
+        SegmentStatusManager.writeStringIntoFile(path, content);
+        // delete merge index file.
+        for (String file : mergeIndexFiles) {
+          FileFactory.getCarbonFile(file).delete();
+        }
+        // no need to delete index files, so return from here.
+        return uuid;
+      }
     }
     for (CarbonFile file : indexFiles) {
       file.delete();
diff --git a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index 02bb465..f0dc758 100644
--- a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
@@ -89,7 +90,13 @@ public class CarbonInternalLoaderUtil {
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
     try {
-      if (carbonLock.lockWithRetries()) {
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
         LOGGER.info("Acquired lock for table" + databaseName + "." + tableName
             + " for table status update");
 
@@ -171,6 +178,9 @@ public class CarbonInternalLoaderUtil {
         LOGGER.error(
             "Not able to acquire the lock for Table status update for table " + databaseName + "."
                 + tableName);
+        throw new RuntimeException(
+            "Not able to acquire the lock for Table status updation for table " + databaseName + "."
+                + tableName);
       }
     } catch (IOException e) {
       LOGGER.error(
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 95c6941..fc3f578 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -268,9 +268,15 @@ object CarbonDataRDDFactory {
           }
         } finally {
           executor.shutdownNow()
-          compactor.deletePartialLoadsInCompaction()
-          if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
-            compactionLock.unlock()
+          try {
+            compactor.deletePartialLoadsInCompaction()
+          } catch {
+            // no need to throw this as compaction is over
+            case ex: Exception =>
+          } finally {
+            if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
+              compactionLock.unlock()
+            }
           }
         }
       }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index f6c1dd5..69ea294 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -347,6 +347,15 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           segmentFileName,
           MVManagerInSpark.get(sc.sparkSession))
 
+      if (!statusFileUpdate) {
+        // no need to call merge index if table status update has failed
+        LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+                     s"${ carbonLoadModel.getTableName }")
+        throw new Exception(s"Compaction failed to update metadata for table" +
+                            s" ${ carbonLoadModel.getDatabaseName }." +
+                            s"${ carbonLoadModel.getTableName }")
+      }
+
       if (compactionType != CompactionType.IUD_DELETE_DELTA &&
           compactionType != CompactionType.IUD_UPDDEL_DELTA) {
         MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
@@ -372,11 +381,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       } else {
         true
       }
-      // here either of the conditions can be true, when delete segment is fired after compaction
-      // has started, statusFileUpdate will be false , but at the same time commitComplete can be
-      // true because compaction for all indexes will be finished at a time to the maximum level
-      // possible (level 1, 2 etc). so we need to check for either condition
-      if (!statusFileUpdate || !commitComplete) {
+      if (!commitComplete) {
         LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
                      s"${ carbonLoadModel.getTableName }")
         throw new Exception(s"Compaction failed to update metadata for table" +
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index adba486..3c0e22b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.{TableBlockInfo, TaskBlockInfo}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
+import org.apache.carbondata.core.locks.CarbonLockUtil
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
 import org.apache.carbondata.core.metadata.datatype.DataType
@@ -225,7 +226,13 @@ object SecondaryIndexUtil {
           val statusLock =
             new SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier).getTableStatusLock
           try {
-            if (statusLock.lockWithRetries()) {
+            val retryCount = CarbonLockUtil.getLockProperty(CarbonCommonConstants
+              .NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
+            val maxTimeout = CarbonLockUtil.getLockProperty(CarbonCommonConstants
+              .MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
+            if (statusLock.lockWithRetries(retryCount, maxTimeout)) {
               val endTime = System.currentTimeMillis()
               val loadMetadataDetails = SegmentStatusManager
                 .readLoadMetadata(indexCarbonTable.getMetadataPath)
@@ -242,6 +249,10 @@ object SecondaryIndexUtil {
               SegmentStatusManager
                 .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
                   loadMetadataDetails)
+            } else {
+              throw new RuntimeException(
+                "Not able to acquire the lock for table status updation for table " + databaseName +
+                "." + indexCarbonTable.getTableName)
             }
           } finally {
             if (statusLock != null) {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index 4aba751..5c41dd7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -31,6 +31,10 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
@@ -64,45 +68,86 @@ public class TableProcessingOperations {
       if (allSegments == null || allSegments.length == 0) {
         return;
       }
-      LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      // there is no segment or failed to read tablestatus file.
-      // so it should stop immediately.
-      if (details == null || details.length == 0) {
-        return;
-      }
-      Set<String> metadataSet = new HashSet<>(details.length);
-      for (LoadMetadataDetails detail : details) {
-        metadataSet.add(detail.getLoadName());
-      }
-      List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
-      for (CarbonFile segment : allSegments) {
-        String segmentName = segment.getName();
-        // check segment folder pattern
-        if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
-          String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
-          if (parts.length == 2) {
-            boolean isOriginal = !parts[1].contains(".");
-            if (isCompactionFlow) {
-              // in compaction flow, it should be big segment and segment metadata is not exists
-              if (!isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
-              }
-            } else {
-              // in loading flow, it should be original segment and segment metadata is not exists
-              if (isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          if (details == null || details.length == 0) {
+            return;
+          }
+          Set<String> metadataSet = new HashSet<>(details.length);
+          for (LoadMetadataDetails detail : details) {
+            metadataSet.add(detail.getLoadName());
+          }
+          List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
+          Set<String> staleSegmentsId = new HashSet<>(allSegments.length);
+          for (CarbonFile segment : allSegments) {
+            String segmentName = segment.getName();
+            // check segment folder pattern
+            if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+              String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+              if (parts.length == 2) {
+                boolean isOriginal = !parts[1].contains(".");
+                if (isCompactionFlow) {
+                  // in compaction flow,
+                  // it should be merged segment and segment metadata doesn't exists
+                  if (!isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                } else {
+                  // in loading flow,
+                  // it should be original segment and segment metadata doesn't exists
+                  if (isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                }
               }
             }
           }
+          // delete segment folders one by one
+          for (CarbonFile staleSegment : staleSegments) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(staleSegment);
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
+            }
+          }
+          if (staleSegments.size() > 0) {
+            // get the segment metadata path
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            // delete the segment metadata files also
+            CarbonFile[] staleSegmentMetadataFiles = FileFactory.getCarbonFile(segmentFilesLocation)
+                .listFiles(file -> (staleSegmentsId
+                    .contains(file.getName().split(CarbonCommonConstants.UNDERSCORE)[0])));
+            for (CarbonFile staleSegmentMetadataFile : staleSegmentMetadataFiles) {
+              staleSegmentMetadataFile.delete();
+            }
+          }
+        } else {
+          String errorMessage =
+              "Not able to acquire the Table status lock for partial load deletion for table "
+                  + carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
+          if (isCompactionFlow) {
+            LOGGER.error(errorMessage + ", retry compaction");
+            throw new RuntimeException(errorMessage + ", retry compaction");
+          } else {
+            LOGGER.error(errorMessage);
+          }
         }
-      }
-      // delete segment one by one
-      for (CarbonFile staleSegment : staleSegments) {
-        try {
-          CarbonUtil.deleteFoldersAndFiles(staleSegment);
-        } catch (IOException | InterruptedException e) {
-          LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
-        }
+      } finally {
+        carbonTableStatusLock.unlock();
       }
     }
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index da70164..e873eef 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.Segment;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -311,7 +312,13 @@ public final class CarbonDataMergerUtil {
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
     try {
-      if (carbonLock.lockWithRetries()) {
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
         LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
             + carbonLoadModel.getTableName() + " for table status updation ");
 
@@ -795,29 +802,35 @@ public final class CarbonDataMergerUtil {
       }
       String segName = segment.getLoadName();
 
-      // if a segment is already merged 2 levels then it s name will become .2
+      // if a segment is already merged 2 or more levels (possible from custom compaction),
       // need to exclude those segments from minor compaction.
       // if a segment is major compacted then should not be considered for minor.
-      if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || (
-          segment.isMajorCompacted() != null && segment.isMajorCompacted()
-              .equalsIgnoreCase("true"))) {
+      boolean isMoreThanOrEqualsToLevel2 = false;
+      if (segName.contains(".")) {
+        if (Integer.parseInt(segName.substring(segName.lastIndexOf(".") + 1)) >= 2) {
+          isMoreThanOrEqualsToLevel2 = true;
+        }
+      }
+      if (isMoreThanOrEqualsToLevel2 || (segment.isMajorCompacted() != null && segment
+          .isMajorCompacted().equalsIgnoreCase("true"))) {
         continue;
       }
-
-      // check if the segment is merged or not
-
-      if (!isMergedSegment(segName)) {
-        //if it is an unmerged segment then increment counter
-        unMergeCounter++;
-        unMergedSegments.add(segment);
-        if (unMergeCounter == (level1Size)) {
-          return unMergedSegments;
-        }
-      } else {
-        mergeCounter++;
-        mergedSegments.add(segment);
-        if (mergeCounter == (level2Size)) {
-          return mergedSegments;
+      // check if the segment is merged or not, consider only non-compacted segments for merge.
+      if ((segment.getSegmentStatus() == SegmentStatus.SUCCESS) || (segment.getSegmentStatus()
+          == SegmentStatus.LOAD_PARTIAL_SUCCESS)) {
+        if (!isMergedSegment(segName)) {
+          //if it is an unmerged segment then increment counter
+          unMergeCounter++;
+          unMergedSegments.add(segment);
+          if (unMergeCounter == (level1Size)) {
+            return unMergedSegments;
+          }
+        } else {
+          mergeCounter++;
+          mergedSegments.add(segment);
+          if (mergeCounter == (level2Size)) {
+            return mergedSegments;
+          }
         }
       }
     }