You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/07/24 12:58:15 UTC

[carbondata] branch master updated: [CARBONDATA-3920] Fix compaction failure issue for SI table and metadata mismatch in concurrency

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 30eefe3  [CARBONDATA-3920] Fix compaction failure issue for SI table and metadata mismatch in concurrency
30eefe3 is described below

commit 30eefe3d226e35bd7c9ed0f1c4c77c5b22809b76
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Mon Jul 20 14:50:49 2020 +0530

    [CARBONDATA-3920] Fix compaction failure issue for SI table and metadata mismatch in concurrency
    
    Why is this PR needed?
    When load and compaction are happening concurrently, in reliability test segment data will be deleted from SI table, which leads to exception/failures
    pre-priming was happening for SI table segment in case of compaction before making SI segment as a success.
    
    What changes were proposed in this PR?
    remove unnecessary cleaning API call from SI flow and before compaction success segment locks were getting released for SI, handle that
    do the code refactoring in case of SI load after main table compaction to handle proper pre-priming after segments were made success.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No(tested in cluster with 10 concurrency and around 1000 loads)
    
    This closes #3854
---
 .../org/apache/carbondata/core/index/Segment.java  |  5 ++
 .../spark/rdd/CarbonTableCompactor.scala           | 15 +++++
 .../events/CleanFilesPostEventListener.scala       | 68 +++++++++++++++++++++-
 .../SILoadEventListenerForFailedSegments.scala     | 23 +++++++-
 .../spark/sql/secondaryindex/load/Compactor.scala  | 28 ++++++++-
 .../secondaryindex/rdd/SecondaryIndexCreator.scala | 59 +++++++++++--------
 .../secondaryindex/util/SecondaryIndexUtil.scala   |  4 +-
 .../processing/merger/CarbonDataMergerUtil.java    |  6 +-
 8 files changed, 176 insertions(+), 32 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/index/Segment.java b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
index 8fb22bc..202a7d4 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
@@ -275,6 +275,11 @@ public class Segment implements Serializable, Writable {
     return null;
   }
 
+  public static Segment getSegment(String segmentNo, String segmentFileName,
+      ReadCommittedScope readCommittedScope) {
+    return new Segment(segmentNo, segmentFileName, readCommittedScope);
+  }
+
   public Configuration getConfiguration() {
     return readCommittedScope.getConfiguration();
   }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 83d8935..3187f14 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
 
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.{InputSplit, Job}
@@ -37,6 +38,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
@@ -93,6 +95,15 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
       deletePartialLoadsInCompaction()
       val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
+      var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
+      loadsToMerge.asScala.foreach { segmentId =>
+        val segmentLock = CarbonLockFactory
+          .getCarbonLockObj(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+            .getAbsoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(segmentId.getLoadName) + LockUsage.LOCK)
+        segmentLock.lockWithRetries()
+        segmentLocks += segmentLock
+      }
       try {
         scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments, compactedLoad)
       } catch {
@@ -117,6 +128,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
                 Array(compactedLoadToClear))
           }
           throw e
+      } finally {
+        segmentLocks.foreach { segmentLock =>
+          segmentLock.unlock()
+        }
       }
 
       // scan again and determine if anything is there to merge again.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
index fa1e666..97f7836 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
@@ -24,11 +24,18 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{CleanFilesPostEvent, Event, OperationContext, OperationEventListener}
 
 class CleanFilesPostEventListener extends OperationEventListener with Logging {
@@ -54,7 +61,66 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
           SegmentStatusManager.deleteLoadsAndUpdateMetadata(
             indexTable, true, partitions.map(_.asJava).orNull)
           CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
+          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
         }
     }
   }
+
+  /**
+   * This method added to clean the segments which are success in SI and may be compacted or marked
+   * for delete in main table, which can happen in case of concurrent scenarios.
+   */
+  def cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable: CarbonTable,
+      mainTable: CarbonTable): Unit = {
+    val mainTableStatusLock: ICarbonLock = CarbonLockFactory
+      .getCarbonLockObj(mainTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val indexTableStatusLock: ICarbonLock = CarbonLockFactory
+      .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    var mainTableLocked = false
+    var indexTableLocked = false
+    try {
+      mainTableLocked = mainTableStatusLock.lockWithRetries()
+      indexTableLocked = indexTableStatusLock.lockWithRetries()
+      if (mainTableLocked && indexTableLocked) {
+        val mainTableMetadataDetails =
+          SegmentStatusManager.readLoadMetadata(mainTable.getMetadataPath).toSet ++
+          SegmentStatusManager.readLoadHistoryMetadata(mainTable.getMetadataPath).toSet
+        val indexTableMetadataDetails =
+          SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath).toSet
+        val segToStatusMap = mainTableMetadataDetails
+          .map(detail => detail.getLoadName -> detail.getSegmentStatus).toMap
+
+        val unnecessarySegmentsOfSI = indexTableMetadataDetails.filter { indexDetail =>
+          indexDetail.getSegmentStatus.equals(SegmentStatus.SUCCESS) &&
+          segToStatusMap.contains(indexDetail.getLoadName) &&
+          (segToStatusMap(indexDetail.getLoadName).equals(SegmentStatus.COMPACTED) ||
+           segToStatusMap(indexDetail.getLoadName).equals(SegmentStatus.MARKED_FOR_DELETE))
+        }
+        LOGGER.info(s"Unwanted SI segments are: $unnecessarySegmentsOfSI")
+        unnecessarySegmentsOfSI.foreach { detail =>
+          val carbonFile = FileFactory
+            .getCarbonFile(CarbonTablePath
+              .getSegmentPath(indexTable.getTablePath, detail.getLoadName))
+          CarbonUtil.deleteFoldersAndFiles(carbonFile)
+        }
+        unnecessarySegmentsOfSI.foreach { detail =>
+          detail.setSegmentStatus(segToStatusMap(detail.getLoadName))
+          detail.setVisibility("false")
+        }
+
+        SegmentStatusManager.writeLoadDetailsIntoFile(
+          indexTable.getMetadataPath + CarbonTablePath.TABLE_STATUS_FILE,
+          unnecessarySegmentsOfSI.toArray)
+      } else {
+        LOGGER.error("Unable to get the lock file for main/Index table. Please try again later")
+      }
+    } catch {
+      case ex: Exception =>
+        LOGGER.error("clean up of unwanted SI segments failed", ex)
+      // ignore the exception
+    } finally {
+      indexTableStatusLock.unlock()
+      mainTableStatusLock.unlock()
+    }
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index cae33ed..71c6559 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.secondaryindex.events
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 
 import org.apache.log4j.Logger
 import org.apache.spark.internal.Logging
@@ -33,7 +34,7 @@ import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
@@ -91,6 +92,7 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
                   SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
                 val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
                   SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+                var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
                 if (!isLoadSIForFailedSegments
                     || !CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
                   mainTblLoadMetadataDetails,
@@ -166,8 +168,19 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
                         if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
                             mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
                           detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                          LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName)
-                          failedLoadMetadataDetails.add(detail(0))
+                          // in concurrent scenario, if a compaction is going on table, then SI
+                          // segments are updated first in table status and then the main table
+                          // segment, so in any load runs paralley this listener shouldn't consider
+                          // those segments accidentally. So try to take the segment lock.
+                          val segmentLockOfProbableOngngCompactionSeg = CarbonLockFactory
+                            .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+                              CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+                              LockUsage.LOCK)
+                          if (segmentLockOfProbableOngngCompactionSeg.lockWithRetries()) {
+                            segmentLocks += segmentLockOfProbableOngngCompactionSeg
+                            LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName)
+                            failedLoadMetadataDetails.add(detail(0))
+                          }
                         }
                       }
                     })
@@ -221,6 +234,10 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
                       LOGGER.error(s"Load to SI table to $indexTableName is failed " +
                                s"or SI table ENABLE is failed. ", ex)
                       return
+                  } finally {
+                    segmentLocks.foreach {
+                      segmentLock => segmentLock.unlock()
+                    }
                   }
                 }
             }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index 39b3d94..7f4ae00 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -19,19 +19,25 @@ package org.apache.spark.sql.secondaryindex.load
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.rdd.CarbonMergeFilesRDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.secondaryindex.command.{IndexModel, SecondaryIndexModel}
+import org.apache.spark.sql.secondaryindex.events.LoadTableSIPostExecutionEvent
 import org.apache.spark.sql.secondaryindex.rdd.SecondaryIndexCreator
 import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.ICarbonLock
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.events.OperationListenerBus
+import org.apache.carbondata.indexserver.DistributedRDDUtils
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
 object Compactor {
@@ -60,6 +66,7 @@ object Compactor {
     } else {
       java.util.Collections.emptyIterator()
     }
+    var allSegmentsLock: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer.empty
     while (iterator.hasNext) {
       val index = iterator.next()
       val indexColumns = index.getValue.get(CarbonCommonConstants.INDEX_COLUMNS).split(",").toList
@@ -76,11 +83,12 @@ object Compactor {
       try {
         val segmentToSegmentTimestampMap: util.Map[String, String] = new java.util
         .HashMap[String, String]()
-        val indexCarbonTable = SecondaryIndexCreator
+        val (indexCarbonTable, segmentLocks, operationContext) = SecondaryIndexCreator
           .createSecondaryIndex(secondaryIndexModel,
             segmentToSegmentTimestampMap, null,
             forceAccessSegment, isCompactionCall = true,
             isLoadToFailedSISegments = false)
+        allSegmentsLock ++= segmentLocks
         CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus(
           indexCarbonTable,
           loadsToMerge,
@@ -122,6 +130,19 @@ object Compactor {
           segmentIdToLoadStartTimeMapping(validSegments.head),
           SegmentStatus.SUCCESS,
           carbonLoadModelForMergeDataFiles.getFactTimeStamp, rebuiltSegments.toList.asJava)
+
+        // Index PrePriming for SI
+        DistributedRDDUtils.triggerPrepriming(sparkSession, indexCarbonTable, Seq(),
+          operationContext, FileFactory.getConfiguration, validSegments)
+
+        val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
+          LoadTableSIPostExecutionEvent(sparkSession,
+            indexCarbonTable.getCarbonTableIdentifier,
+            secondaryIndexModel.carbonLoadModel,
+            indexCarbonTable)
+        OperationListenerBus.getInstance
+          .fireEvent(loadTableSIPostExecutionEvent, operationContext)
+
         siCompactionIndexList ::= indexCarbonTable
       } catch {
         case ex: Exception =>
@@ -136,6 +157,11 @@ object Compactor {
                """.stripMargin).collect()
           }
           throw ex
+      } finally {
+        // once compaction is success, release the segment locks
+        allSegmentsLock.foreach { segmentLock =>
+          segmentLock.unlock()
+        }
       }
     }
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 04cfbb0..e897051 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -59,7 +59,7 @@ object SecondaryIndexCreator {
     indexTable: CarbonTable,
     forceAccessSegment: Boolean = false,
     isCompactionCall: Boolean,
-    isLoadToFailedSISegments: Boolean): CarbonTable = {
+    isLoadToFailedSISegments: Boolean): (CarbonTable, ListBuffer[ICarbonLock], OperationContext) = {
     var indexCarbonTable = indexTable
     val sc = secondaryIndexModel.sqlContext
     // get the thread pool size for secondary index creation
@@ -76,7 +76,7 @@ object SecondaryIndexCreator {
       indexCarbonTable = metastore
         .lookupRelation(Some(secondaryIndexModel.carbonLoadModel.getDatabaseName),
           secondaryIndexModel.secondaryIndex.indexName)(secondaryIndexModel.sqlContext
-          .sparkSession).asInstanceOf[CarbonRelation].carbonTable
+          .sparkSession).carbonTable
     }
 
     val operationContext = new OperationContext
@@ -119,6 +119,9 @@ object SecondaryIndexCreator {
       }
 
       validSegmentList = validSegments.asScala.toList
+      if (validSegmentList.isEmpty) {
+        return (indexCarbonTable, segmentLocks, operationContext)
+      }
 
       LOGGER.info(s"${indexCarbonTable.getTableUniqueName}: SI loading is started " +
               s"for segments: $validSegmentList")
@@ -266,13 +269,15 @@ object SecondaryIndexCreator {
           rebuiltSegments)
       }
 
-      // Index PrePriming for SI
-      DistributedRDDUtils.triggerPrepriming(secondaryIndexModel.sqlContext.sparkSession,
-        indexCarbonTable,
-        Seq(),
-        operationContext,
-        FileFactory.getConfiguration,
-        validSegments.asScala.toList)
+      if (!isCompactionCall) {
+        // Index PrePriming for SI
+        DistributedRDDUtils.triggerPrepriming(secondaryIndexModel.sqlContext.sparkSession,
+          indexCarbonTable,
+          Seq(),
+          operationContext,
+          FileFactory.getConfiguration,
+          validSegments.asScala.toList)
+      }
 
       // update the status of all the segments to marked for delete if data load fails, so that
       // next load which is triggered for SI table in post event of main table data load clears
@@ -294,17 +299,24 @@ object SecondaryIndexCreator {
         LOGGER.error("Dataload to secondary index creation has failed")
       }
 
-      val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
-        LoadTableSIPostExecutionEvent(sc.sparkSession,
-          indexCarbonTable.getCarbonTableIdentifier,
-          secondaryIndexModel.carbonLoadModel,
-          indexCarbonTable)
-      OperationListenerBus.getInstance
-        .fireEvent(loadTableSIPostExecutionEvent, operationContext)
+      if (!isCompactionCall) {
+        val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
+          LoadTableSIPostExecutionEvent(sc.sparkSession,
+            indexCarbonTable.getCarbonTableIdentifier,
+            secondaryIndexModel.carbonLoadModel,
+            indexCarbonTable)
+        OperationListenerBus.getInstance
+          .fireEvent(loadTableSIPostExecutionEvent, operationContext)
+      }
 
-      indexCarbonTable
+      if (isCompactionCall) {
+        (indexCarbonTable, segmentLocks, operationContext)
+      } else {
+        (indexCarbonTable, ListBuffer.empty, operationContext)
+      }
     } catch {
       case ex: Exception =>
+        LOGGER.error("Load to SI table failed", ex)
         FileInternalUtil
           .updateTableStatus(validSegmentList,
             secondaryIndexModel.carbonLoadModel.getDatabaseName,
@@ -316,13 +328,8 @@ object SecondaryIndexCreator {
               String](),
             indexCarbonTable,
             sc.sparkSession)
-        LOGGER.error(ex)
         throw ex
     } finally {
-      // release the segment locks
-      segmentLocks.foreach(segmentLock => {
-        segmentLock.unlock()
-      })
       // if some segments are skipped, disable the SI table so that
       // SILoadEventListenerForFailedSegments will take care to load to these segments in next
       // consecutive load to main table.
@@ -339,7 +346,6 @@ object SecondaryIndexCreator {
         if (!isCompactionCall) {
           SegmentStatusManager
             .deleteLoadsAndUpdateMetadata(indexCarbonTable, false, null)
-          TableProcessingOperations.deletePartialLoadDataIfExist(indexCarbonTable, false)
         }
       } catch {
         case e: Exception =>
@@ -351,6 +357,13 @@ object SecondaryIndexCreator {
       if (null != executorService) {
         executorService.shutdownNow()
       }
+
+      // release the segment locks only for load flow
+      if (!isCompactionCall) {
+        segmentLocks.foreach(segmentLock => {
+          segmentLock.unlock()
+        })
+      }
     }
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 498c739..034cd1c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -199,9 +199,11 @@ object SecondaryIndexUtil {
       }
       if (finalMergeStatus) {
         if (null != mergeStatus && mergeStatus.length != 0) {
+          val validSegmentsToUse = validSegments.asScala
+            .filter(segment => mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo))
           deleteOldIndexOrMergeIndexFiles(
             carbonLoadModel.getFactTimeStamp,
-            validSegments,
+            validSegmentsToUse.toList.asJava,
             indexCarbonTable)
           mergedSegments.asScala.map { seg =>
             val file = SegmentFileStore.writeSegmentFile(
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 253a78b..da70164 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -933,10 +933,10 @@ public final class CarbonDataMergerUtil {
     for (LoadMetadataDetails segment : loadMetadataDetails) {
       //check if this load is an already merged load.
       if (null != segment.getMergedLoadName()) {
-
-        segments.add(Segment.toSegment(segment.getMergedLoadName(), null));
+        segments
+            .add(Segment.getSegment(segment.getMergedLoadName(), segment.getSegmentFile(), null));
       } else {
-        segments.add(Segment.toSegment(segment.getLoadName(), null));
+        segments.add(Segment.getSegment(segment.getLoadName(), segment.getSegmentFile(), null));
       }
     }
     return segments;