You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/07/24 12:58:15 UTC
[carbondata] branch master updated: [CARBONDATA-3920] Fix
compaction failure issue for SI table and metadata mismatch in concurrency
This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 30eefe3 [CARBONDATA-3920] Fix compaction failure issue for SI table and metadata mismatch in concurrency
30eefe3 is described below
commit 30eefe3d226e35bd7c9ed0f1c4c77c5b22809b76
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Mon Jul 20 14:50:49 2020 +0530
[CARBONDATA-3920] Fix compaction failure issue for SI table and metadata mismatch in concurrency
Why is this PR needed?
When load and compaction are happening concurrently, in reliability test segment data will be deleted from SI table, which leads to exception/failures
pre-priming was happening for SI table segment in case of compaction before making SI segment as a success.
What changes were proposed in this PR?
remove unnecessary cleaning API call from SI flow and before compaction success segment locks were getting released for SI, handle that
do the code refactoring in case of SI load after main table compaction to handle proper pre-priming after segments were made success.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No(tested in cluster with 10 concurrency and around 1000 loads)
This closes #3854
---
.../org/apache/carbondata/core/index/Segment.java | 5 ++
.../spark/rdd/CarbonTableCompactor.scala | 15 +++++
.../events/CleanFilesPostEventListener.scala | 68 +++++++++++++++++++++-
.../SILoadEventListenerForFailedSegments.scala | 23 +++++++-
.../spark/sql/secondaryindex/load/Compactor.scala | 28 ++++++++-
.../secondaryindex/rdd/SecondaryIndexCreator.scala | 59 +++++++++++--------
.../secondaryindex/util/SecondaryIndexUtil.scala | 4 +-
.../processing/merger/CarbonDataMergerUtil.java | 6 +-
8 files changed, 176 insertions(+), 32 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/index/Segment.java b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
index 8fb22bc..202a7d4 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
@@ -275,6 +275,11 @@ public class Segment implements Serializable, Writable {
return null;
}
+ public static Segment getSegment(String segmentNo, String segmentFileName,
+ ReadCommittedScope readCommittedScope) {
+ return new Segment(segmentNo, segmentFileName, readCommittedScope);
+ }
+
public Configuration getConfiguration() {
return readCommittedScope.getConfiguration();
}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 83d8935..3187f14 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputSplit, Job}
@@ -37,6 +38,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
@@ -93,6 +95,15 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
deletePartialLoadsInCompaction()
val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
+ var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
+ loadsToMerge.asScala.foreach { segmentId =>
+ val segmentLock = CarbonLockFactory
+ .getCarbonLockObj(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ .getAbsoluteTableIdentifier,
+ CarbonTablePath.addSegmentPrefix(segmentId.getLoadName) + LockUsage.LOCK)
+ segmentLock.lockWithRetries()
+ segmentLocks += segmentLock
+ }
try {
scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments, compactedLoad)
} catch {
@@ -117,6 +128,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
Array(compactedLoadToClear))
}
throw e
+ } finally {
+ segmentLocks.foreach { segmentLock =>
+ segmentLock.unlock()
+ }
}
// scan again and determine if anything is there to merge again.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
index fa1e666..97f7836 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
@@ -24,11 +24,18 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{CleanFilesPostEvent, Event, OperationContext, OperationEventListener}
class CleanFilesPostEventListener extends OperationEventListener with Logging {
@@ -54,7 +61,66 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
SegmentStatusManager.deleteLoadsAndUpdateMetadata(
indexTable, true, partitions.map(_.asJava).orNull)
CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
+ cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
}
}
}
+
+ /**
+ * This method added to clean the segments which are success in SI and may be compacted or marked
+ * for delete in main table, which can happen in case of concurrent scenarios.
+ */
+ def cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable: CarbonTable,
+ mainTable: CarbonTable): Unit = {
+ val mainTableStatusLock: ICarbonLock = CarbonLockFactory
+ .getCarbonLockObj(mainTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+ val indexTableStatusLock: ICarbonLock = CarbonLockFactory
+ .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+ var mainTableLocked = false
+ var indexTableLocked = false
+ try {
+ mainTableLocked = mainTableStatusLock.lockWithRetries()
+ indexTableLocked = indexTableStatusLock.lockWithRetries()
+ if (mainTableLocked && indexTableLocked) {
+ val mainTableMetadataDetails =
+ SegmentStatusManager.readLoadMetadata(mainTable.getMetadataPath).toSet ++
+ SegmentStatusManager.readLoadHistoryMetadata(mainTable.getMetadataPath).toSet
+ val indexTableMetadataDetails =
+ SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath).toSet
+ val segToStatusMap = mainTableMetadataDetails
+ .map(detail => detail.getLoadName -> detail.getSegmentStatus).toMap
+
+ val unnecessarySegmentsOfSI = indexTableMetadataDetails.filter { indexDetail =>
+ indexDetail.getSegmentStatus.equals(SegmentStatus.SUCCESS) &&
+ segToStatusMap.contains(indexDetail.getLoadName) &&
+ (segToStatusMap(indexDetail.getLoadName).equals(SegmentStatus.COMPACTED) ||
+ segToStatusMap(indexDetail.getLoadName).equals(SegmentStatus.MARKED_FOR_DELETE))
+ }
+ LOGGER.info(s"Unwanted SI segments are: $unnecessarySegmentsOfSI")
+ unnecessarySegmentsOfSI.foreach { detail =>
+ val carbonFile = FileFactory
+ .getCarbonFile(CarbonTablePath
+ .getSegmentPath(indexTable.getTablePath, detail.getLoadName))
+ CarbonUtil.deleteFoldersAndFiles(carbonFile)
+ }
+ unnecessarySegmentsOfSI.foreach { detail =>
+ detail.setSegmentStatus(segToStatusMap(detail.getLoadName))
+ detail.setVisibility("false")
+ }
+
+ SegmentStatusManager.writeLoadDetailsIntoFile(
+ indexTable.getMetadataPath + CarbonTablePath.TABLE_STATUS_FILE,
+ unnecessarySegmentsOfSI.toArray)
+ } else {
+ LOGGER.error("Unable to get the lock file for main/Index table. Please try again later")
+ }
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("clean up of unwanted SI segments failed", ex)
+ // ignore the exception
+ } finally {
+ indexTableStatusLock.unlock()
+ mainTableStatusLock.unlock()
+ }
+ }
}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index cae33ed..71c6559 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.secondaryindex.events
import java.util
import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
import org.apache.log4j.Logger
import org.apache.spark.internal.Logging
@@ -33,7 +34,7 @@ import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
@@ -91,6 +92,7 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+ var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
if (!isLoadSIForFailedSegments
|| !CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
mainTblLoadMetadataDetails,
@@ -166,8 +168,19 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
- LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName)
- failedLoadMetadataDetails.add(detail(0))
+ // in concurrent scenario, if a compaction is going on table, then SI
+ // segments are updated first in table status and then the main table
+ // segment, so in any load runs paralley this listener shouldn't consider
+ // those segments accidentally. So try to take the segment lock.
+ val segmentLockOfProbableOngngCompactionSeg = CarbonLockFactory
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+ CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+ LockUsage.LOCK)
+ if (segmentLockOfProbableOngngCompactionSeg.lockWithRetries()) {
+ segmentLocks += segmentLockOfProbableOngngCompactionSeg
+ LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName)
+ failedLoadMetadataDetails.add(detail(0))
+ }
}
}
})
@@ -221,6 +234,10 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
LOGGER.error(s"Load to SI table to $indexTableName is failed " +
s"or SI table ENABLE is failed. ", ex)
return
+ } finally {
+ segmentLocks.foreach {
+ segmentLock => segmentLock.unlock()
+ }
}
}
}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index 39b3d94..7f4ae00 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -19,19 +19,25 @@ package org.apache.spark.sql.secondaryindex.load
import java.util
import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
import org.apache.spark.rdd.CarbonMergeFilesRDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.secondaryindex.command.{IndexModel, SecondaryIndexModel}
+import org.apache.spark.sql.secondaryindex.events.LoadTableSIPostExecutionEvent
import org.apache.spark.sql.secondaryindex.rdd.SecondaryIndexCreator
import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.ICarbonLock
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.events.OperationListenerBus
+import org.apache.carbondata.indexserver.DistributedRDDUtils
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
object Compactor {
@@ -60,6 +66,7 @@ object Compactor {
} else {
java.util.Collections.emptyIterator()
}
+ var allSegmentsLock: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer.empty
while (iterator.hasNext) {
val index = iterator.next()
val indexColumns = index.getValue.get(CarbonCommonConstants.INDEX_COLUMNS).split(",").toList
@@ -76,11 +83,12 @@ object Compactor {
try {
val segmentToSegmentTimestampMap: util.Map[String, String] = new java.util
.HashMap[String, String]()
- val indexCarbonTable = SecondaryIndexCreator
+ val (indexCarbonTable, segmentLocks, operationContext) = SecondaryIndexCreator
.createSecondaryIndex(secondaryIndexModel,
segmentToSegmentTimestampMap, null,
forceAccessSegment, isCompactionCall = true,
isLoadToFailedSISegments = false)
+ allSegmentsLock ++= segmentLocks
CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus(
indexCarbonTable,
loadsToMerge,
@@ -122,6 +130,19 @@ object Compactor {
segmentIdToLoadStartTimeMapping(validSegments.head),
SegmentStatus.SUCCESS,
carbonLoadModelForMergeDataFiles.getFactTimeStamp, rebuiltSegments.toList.asJava)
+
+ // Index PrePriming for SI
+ DistributedRDDUtils.triggerPrepriming(sparkSession, indexCarbonTable, Seq(),
+ operationContext, FileFactory.getConfiguration, validSegments)
+
+ val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
+ LoadTableSIPostExecutionEvent(sparkSession,
+ indexCarbonTable.getCarbonTableIdentifier,
+ secondaryIndexModel.carbonLoadModel,
+ indexCarbonTable)
+ OperationListenerBus.getInstance
+ .fireEvent(loadTableSIPostExecutionEvent, operationContext)
+
siCompactionIndexList ::= indexCarbonTable
} catch {
case ex: Exception =>
@@ -136,6 +157,11 @@ object Compactor {
""".stripMargin).collect()
}
throw ex
+ } finally {
+ // once compaction is success, release the segment locks
+ allSegmentsLock.foreach { segmentLock =>
+ segmentLock.unlock()
+ }
}
}
}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 04cfbb0..e897051 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -59,7 +59,7 @@ object SecondaryIndexCreator {
indexTable: CarbonTable,
forceAccessSegment: Boolean = false,
isCompactionCall: Boolean,
- isLoadToFailedSISegments: Boolean): CarbonTable = {
+ isLoadToFailedSISegments: Boolean): (CarbonTable, ListBuffer[ICarbonLock], OperationContext) = {
var indexCarbonTable = indexTable
val sc = secondaryIndexModel.sqlContext
// get the thread pool size for secondary index creation
@@ -76,7 +76,7 @@ object SecondaryIndexCreator {
indexCarbonTable = metastore
.lookupRelation(Some(secondaryIndexModel.carbonLoadModel.getDatabaseName),
secondaryIndexModel.secondaryIndex.indexName)(secondaryIndexModel.sqlContext
- .sparkSession).asInstanceOf[CarbonRelation].carbonTable
+ .sparkSession).carbonTable
}
val operationContext = new OperationContext
@@ -119,6 +119,9 @@ object SecondaryIndexCreator {
}
validSegmentList = validSegments.asScala.toList
+ if (validSegmentList.isEmpty) {
+ return (indexCarbonTable, segmentLocks, operationContext)
+ }
LOGGER.info(s"${indexCarbonTable.getTableUniqueName}: SI loading is started " +
s"for segments: $validSegmentList")
@@ -266,13 +269,15 @@ object SecondaryIndexCreator {
rebuiltSegments)
}
- // Index PrePriming for SI
- DistributedRDDUtils.triggerPrepriming(secondaryIndexModel.sqlContext.sparkSession,
- indexCarbonTable,
- Seq(),
- operationContext,
- FileFactory.getConfiguration,
- validSegments.asScala.toList)
+ if (!isCompactionCall) {
+ // Index PrePriming for SI
+ DistributedRDDUtils.triggerPrepriming(secondaryIndexModel.sqlContext.sparkSession,
+ indexCarbonTable,
+ Seq(),
+ operationContext,
+ FileFactory.getConfiguration,
+ validSegments.asScala.toList)
+ }
// update the status of all the segments to marked for delete if data load fails, so that
// next load which is triggered for SI table in post event of main table data load clears
@@ -294,17 +299,24 @@ object SecondaryIndexCreator {
LOGGER.error("Dataload to secondary index creation has failed")
}
- val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
- LoadTableSIPostExecutionEvent(sc.sparkSession,
- indexCarbonTable.getCarbonTableIdentifier,
- secondaryIndexModel.carbonLoadModel,
- indexCarbonTable)
- OperationListenerBus.getInstance
- .fireEvent(loadTableSIPostExecutionEvent, operationContext)
+ if (!isCompactionCall) {
+ val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
+ LoadTableSIPostExecutionEvent(sc.sparkSession,
+ indexCarbonTable.getCarbonTableIdentifier,
+ secondaryIndexModel.carbonLoadModel,
+ indexCarbonTable)
+ OperationListenerBus.getInstance
+ .fireEvent(loadTableSIPostExecutionEvent, operationContext)
+ }
- indexCarbonTable
+ if (isCompactionCall) {
+ (indexCarbonTable, segmentLocks, operationContext)
+ } else {
+ (indexCarbonTable, ListBuffer.empty, operationContext)
+ }
} catch {
case ex: Exception =>
+ LOGGER.error("Load to SI table failed", ex)
FileInternalUtil
.updateTableStatus(validSegmentList,
secondaryIndexModel.carbonLoadModel.getDatabaseName,
@@ -316,13 +328,8 @@ object SecondaryIndexCreator {
String](),
indexCarbonTable,
sc.sparkSession)
- LOGGER.error(ex)
throw ex
} finally {
- // release the segment locks
- segmentLocks.foreach(segmentLock => {
- segmentLock.unlock()
- })
// if some segments are skipped, disable the SI table so that
// SILoadEventListenerForFailedSegments will take care to load to these segments in next
// consecutive load to main table.
@@ -339,7 +346,6 @@ object SecondaryIndexCreator {
if (!isCompactionCall) {
SegmentStatusManager
.deleteLoadsAndUpdateMetadata(indexCarbonTable, false, null)
- TableProcessingOperations.deletePartialLoadDataIfExist(indexCarbonTable, false)
}
} catch {
case e: Exception =>
@@ -351,6 +357,13 @@ object SecondaryIndexCreator {
if (null != executorService) {
executorService.shutdownNow()
}
+
+ // release the segment locks only for load flow
+ if (!isCompactionCall) {
+ segmentLocks.foreach(segmentLock => {
+ segmentLock.unlock()
+ })
+ }
}
}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 498c739..034cd1c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -199,9 +199,11 @@ object SecondaryIndexUtil {
}
if (finalMergeStatus) {
if (null != mergeStatus && mergeStatus.length != 0) {
+ val validSegmentsToUse = validSegments.asScala
+ .filter(segment => mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo))
deleteOldIndexOrMergeIndexFiles(
carbonLoadModel.getFactTimeStamp,
- validSegments,
+ validSegmentsToUse.toList.asJava,
indexCarbonTable)
mergedSegments.asScala.map { seg =>
val file = SegmentFileStore.writeSegmentFile(
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 253a78b..da70164 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -933,10 +933,10 @@ public final class CarbonDataMergerUtil {
for (LoadMetadataDetails segment : loadMetadataDetails) {
//check if this load is an already merged load.
if (null != segment.getMergedLoadName()) {
-
- segments.add(Segment.toSegment(segment.getMergedLoadName(), null));
+ segments
+ .add(Segment.getSegment(segment.getMergedLoadName(), segment.getSegmentFile(), null));
} else {
- segments.add(Segment.toSegment(segment.getLoadName(), null));
+ segments.add(Segment.getSegment(segment.getLoadName(), segment.getSegmentFile(), null));
}
}
return segments;