You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/04/08 14:48:29 UTC

[carbondata] branch master updated: [CARBONDATA-3759] Refactor segmentRefreshInfo and fix cache issue in multiple application scenario

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new f215eaa  [CARBONDATA-3759] Refactor segmentRefreshInfo and fix cache issue in multiple application scenario
f215eaa is described below

commit f215eaad3b80f14032314f27c0a83f0ea77696a1
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Sat Mar 28 17:21:30 2020 +0530

    [CARBONDATA-3759] Refactor segmentRefreshInfo and fix cache issue in multiple application scenario
    
    Why is this PR needed?
    currently the segmentRefreshInfo is helping to clear the cache only in update cases and it fails to refresh the cache if any segment files changes or get updates.
    when two applications are running on same store. One application changes some segment files changes and removes old cache and may be delete files, which wont be reflected in other application, which may result in either wrong results or query failure.
    
    What changes were proposed in this PR?
    refactor the segmentRefreshInfo to clear the cache and update when there are any updates on segments and if the segment files of respective segments changes.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    Tested in cluster and existing test cases will be enough.
    
    This closes #3686
---
 .../core/datamap/DataMapStoreManager.java          | 59 +++++++++++++++-------
 .../LatestFilesReadCommittedScope.java             |  2 +-
 .../TableStatusReadCommittedScope.java             | 12 ++++-
 .../core/statusmanager/SegmentRefreshInfo.java     | 28 +++++-----
 .../statusmanager/SegmentUpdateStatusManager.java  | 25 ++++-----
 .../hadoop/api/CarbonTableInputFormat.java         | 16 +++---
 .../indexserver/DistributedRDDUtils.scala          |  2 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 18 ++++---
 .../joins/BroadCastSIFilterPushJoin.scala          |  2 +-
 9 files changed, 98 insertions(+), 66 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 65ddc5c..cbfa071 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.IndexFactory;
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexFactory;
@@ -44,13 +45,14 @@ import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaFactory;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
 import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
-import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV;
 
@@ -502,11 +504,12 @@ public final class DataMapStoreManager {
   }
 
   public List<String> getSegmentsToBeRefreshed(CarbonTable carbonTable,
-      SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess) {
+      List<Segment> filteredSegmentToAccess) {
     List<String> toBeCleanedSegments = new ArrayList<>();
     for (Segment filteredSegment : filteredSegmentToAccess) {
       boolean refreshNeeded = getTableSegmentRefresher(carbonTable).isRefreshNeeded(filteredSegment,
-          updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
+          SegmentUpdateStatusManager
+              .getInvalidTimestampRange(filteredSegment.getLoadMetadataDetails()));
       if (refreshNeeded) {
         toBeCleanedSegments.add(filteredSegment.getSegmentNo());
       }
@@ -518,7 +521,7 @@ public final class DataMapStoreManager {
       SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
       throws IOException {
     List<String> toBeCleanedSegments =
-        getSegmentsToBeRefreshed(carbonTable, updateStatusManager, filteredSegmentToAccess);
+        getSegmentsToBeRefreshed(carbonTable, filteredSegmentToAccess);
     if (toBeCleanedSegments.size() > 0) {
       clearInvalidSegments(carbonTable, toBeCleanedSegments);
     }
@@ -739,17 +742,30 @@ public final class DataMapStoreManager {
     private Map<String, Boolean> manualSegmentRefresh = new HashMap<>();
 
     TableSegmentRefresher(CarbonTable table) {
-      SegmentUpdateStatusManager statusManager = new SegmentUpdateStatusManager(table);
-      SegmentUpdateDetails[] updateStatusDetails = statusManager.getUpdateStatusDetails();
-      for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
-        UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
+      SegmentStatusManager segmentStatusManager =
+          new SegmentStatusManager(table.getAbsoluteTableIdentifier());
+      List<Segment> validSegments;
+      try {
+        validSegments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments();
+      } catch (IOException e) {
+        LOGGER.error("Error while getting the valid segments.", e);
+        throw new RuntimeException(e);
+      }
+      for (Segment segment : validSegments) {
+        UpdateVO updateVO =
+            SegmentUpdateStatusManager.getInvalidTimestampRange(segment.getLoadMetadataDetails());
         SegmentRefreshInfo segmentRefreshInfo;
-        if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null) {
-          segmentRefreshInfo = new SegmentRefreshInfo(updateVO.getLatestUpdateTimestamp(), 0);
+        if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null
+            || segment.getSegmentFileName() != null) {
+          long segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath
+              .getSegmentFilePath(table.getTablePath(), segment.getSegmentFileName()))
+              .getLastModifiedTime();
+          segmentRefreshInfo =
+              new SegmentRefreshInfo(updateVO.getLatestUpdateTimestamp(), 0, segmentFileTimeStamp);
         } else {
-          segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
+          segmentRefreshInfo = new SegmentRefreshInfo(0L, 0, 0L);
         }
-        segmentRefreshTime.put(updateVO.getSegmentId(), segmentRefreshInfo);
+        segmentRefreshTime.put(segment.getSegmentNo(), segmentRefreshInfo);
       }
     }
 
@@ -757,14 +773,23 @@ public final class DataMapStoreManager {
       SegmentRefreshInfo segmentRefreshInfo =
           seg.getSegmentRefreshInfo(updateVo);
       String segmentId = seg.getSegmentNo();
-      if (segmentRefreshInfo.getSegmentUpdatedTimestamp() == null) {
+      if (segmentRefreshInfo.getSegmentUpdatedTimestamp() == null
+          && segmentRefreshInfo.getSegmentFileTimestamp() == 0) {
         return false;
       }
-      if (segmentRefreshTime.get(segmentId) == null
-          && segmentRefreshInfo.getSegmentUpdatedTimestamp() != 0) {
-        segmentRefreshTime.put(segmentId, segmentRefreshInfo);
-        return true;
+
+      if (segmentRefreshTime.get(segmentId) == null) {
+        if (segmentRefreshInfo.getSegmentUpdatedTimestamp() != null
+            && segmentRefreshInfo.getSegmentUpdatedTimestamp() != 0) {
+          segmentRefreshTime.put(segmentId, segmentRefreshInfo);
+          return true;
+        }
+        if (segmentRefreshInfo.getSegmentFileTimestamp() != 0) {
+          segmentRefreshTime.put(segmentId, segmentRefreshInfo);
+          return true;
+        }
       }
+
       if (manualSegmentRefresh.get(segmentId) != null && manualSegmentRefresh.get(segmentId)) {
         manualSegmentRefresh.put(segmentId, false);
         return true;
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 6975620..7352bca 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -220,7 +220,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
         if (indexFileStore.get(timestamp) == null) {
           indexList = new ArrayList<>(1);
           segmentRefreshInfo =
-              new SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0);
+              new SegmentRefreshInfo(carbonIndexFiles[i].getLastModifiedTime(), 0, 0L);
           segmentTimestampUpdaterMap.put(timestamp, segmentRefreshInfo);
         } else {
           // Entry is already present.
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 5d4ed4e..17a010e 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
@@ -93,10 +94,17 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
 
   public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO updateVo) {
     SegmentRefreshInfo segmentRefreshInfo;
+    long segmentFileTimeStamp = 0L;
+    if (null != segment.getSegmentFileName()) {
+      segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath
+          .getSegmentFilePath(identifier.getTablePath(), segment.getSegmentFileName()))
+          .getLastModifiedTime();
+    }
     if (updateVo != null) {
-      segmentRefreshInfo = new SegmentRefreshInfo(updateVo.getLatestUpdateTimestamp(), 0);
+      segmentRefreshInfo =
+          new SegmentRefreshInfo(updateVo.getLatestUpdateTimestamp(), 0, segmentFileTimeStamp);
     } else {
-      segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
+      segmentRefreshInfo = new SegmentRefreshInfo(0L, 0, segmentFileTimeStamp);
     }
     return segmentRefreshInfo;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
index 88b9176..d7b32cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentRefreshInfo.java
@@ -21,12 +21,17 @@ import java.io.Serializable;
 
 public class SegmentRefreshInfo implements Serializable {
 
-  private Long segmentUpdatedTimestamp;
+  private Long segmentUpdatedTimestamp = 0L;
   private Integer countOfFileInSegment;
+  private Long segmentFileTimestamp = 0L;
 
-  public SegmentRefreshInfo(Long segmentUpdatedTimestamp, Integer countOfFileInSegment) {
-    this.segmentUpdatedTimestamp = segmentUpdatedTimestamp;
+  public SegmentRefreshInfo(Long segmentUpdatedTimestamp, Integer countOfFileInSegment,
+      Long segmentFileTimestamp) {
+    if (segmentUpdatedTimestamp != null) {
+      this.segmentUpdatedTimestamp = segmentUpdatedTimestamp;
+    }
     this.countOfFileInSegment = countOfFileInSegment;
+    this.segmentFileTimestamp = segmentFileTimestamp;
   }
 
   public Long getSegmentUpdatedTimestamp() {
@@ -37,24 +42,21 @@ public class SegmentRefreshInfo implements Serializable {
     this.segmentUpdatedTimestamp = segmentUpdatedTimestamp;
   }
 
-  public Integer getCountOfFileInSegment() {
-    return countOfFileInSegment;
-  }
-
   public void setCountOfFileInSegment(Integer countOfFileInSegment) {
     this.countOfFileInSegment = countOfFileInSegment;
   }
 
+  public Long getSegmentFileTimestamp() {
+    return segmentFileTimestamp;
+  }
+
   public boolean compare(Object o) {
     if (!(o instanceof SegmentRefreshInfo)) return false;
 
     SegmentRefreshInfo that = (SegmentRefreshInfo) o;
-
-    if (segmentUpdatedTimestamp > that.segmentUpdatedTimestamp || !countOfFileInSegment
-        .equals(that.countOfFileInSegment)) {
-      return true;
-    }
-    return false;
+    return segmentUpdatedTimestamp > that.segmentUpdatedTimestamp
+        || segmentFileTimestamp > that.segmentFileTimestamp || !countOfFileInSegment
+        .equals(that.countOfFileInSegment);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 2d5500b..bb17d53 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -783,23 +783,18 @@ public class SegmentUpdateStatusManager {
 
   /**
    * Returns the invalid timestamp range of a segment.
-   * @param segmentId
-   * @return
    */
-  public UpdateVO getInvalidTimestampRange(String segmentId) {
+  public static UpdateVO getInvalidTimestampRange(LoadMetadataDetails loadMetadataDetails) {
     UpdateVO range = new UpdateVO();
-    for (LoadMetadataDetails segment : segmentDetails) {
-      if (segment.getLoadName().equalsIgnoreCase(segmentId)) {
-        range.setSegmentId(segmentId);
-        range.setFactTimestamp(segment.getLoadStartTime());
-        if (!segment.getUpdateDeltaStartTimestamp().isEmpty() && !segment
-            .getUpdateDeltaEndTimestamp().isEmpty()) {
-          range.setUpdateDeltaStartTimestamp(
-              CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaStartTimestamp()));
-          range.setLatestUpdateTimestamp(
-              CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaEndTimestamp()));
-        }
-        return range;
+    if (loadMetadataDetails != null) {
+      range.setSegmentId(loadMetadataDetails.getLoadName());
+      range.setFactTimestamp(loadMetadataDetails.getLoadStartTime());
+      if (!loadMetadataDetails.getUpdateDeltaStartTimestamp().isEmpty() && !loadMetadataDetails
+          .getUpdateDeltaEndTimestamp().isEmpty()) {
+        range.setUpdateDeltaStartTimestamp(CarbonUpdateUtil
+            .getTimeStampAsLong(loadMetadataDetails.getUpdateDeltaStartTimestamp()));
+        range.setLatestUpdateTimestamp(
+            CarbonUpdateUtil.getTimeStampAsLong(loadMetadataDetails.getUpdateDeltaEndTimestamp()));
       }
     }
     return range;
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index bdda845..8fab3a9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -359,13 +359,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
           validSegments);
     } else {
       segmentsToBeRefreshed = DataMapStoreManager.getInstance()
-          .getSegmentsToBeRefreshed(carbonTable, updateStatusManager, validSegments);
+          .getSegmentsToBeRefreshed(carbonTable, validSegments);
     }
 
     numSegments = validSegments.size();
     List<InputSplit> result = new LinkedList<InputSplit>();
     UpdateVO invalidBlockVOForSegmentId = null;
-    boolean isIUDTable = false;
+    boolean isIUDTable;
 
     isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
 
@@ -378,8 +378,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
       // Get the UpdateVO for those tables on which IUD operations being performed.
       if (isIUDTable) {
-        invalidBlockVOForSegmentId =
-            updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+        invalidBlockVOForSegmentId = SegmentUpdateStatusManager
+            .getInvalidTimestampRange(inputSplit.getSegment().getLoadMetadataDetails());
       }
       String[] deleteDeltaFilePath = null;
       if (isIUDTable) {
@@ -453,13 +453,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     SDK is written one set of files with UUID, with same UUID it can write again.
     So, latest files content should reflect the new count by refreshing the segment */
     List<String> toBeCleanedSegments = new ArrayList<>();
-    for (Segment eachSegment : filteredSegment) {
+    for (Segment segment : filteredSegment) {
       boolean refreshNeeded = DataMapStoreManager.getInstance()
           .getTableSegmentRefresher(getOrCreateCarbonTable(job.getConfiguration()))
-          .isRefreshNeeded(eachSegment,
-              updateStatusManager.getInvalidTimestampRange(eachSegment.getSegmentNo()));
+          .isRefreshNeeded(segment, SegmentUpdateStatusManager
+              .getInvalidTimestampRange(segment.getLoadMetadataDetails()));
       if (refreshNeeded) {
-        toBeCleanedSegments.add(eachSegment.getSegmentNo());
+        toBeCleanedSegments.add(segment.getSegmentNo());
       }
     }
     for (Segment segment : allSegments.getInvalidSegments()) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 2a7aa2d..d0e2eda 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -381,7 +381,7 @@ object DistributedRDDUtils {
         // Adding valid segments to segments to be refreshed, so that the select query
         // goes in the same executor.
         DataMapStoreManager.getInstance
-          .getSegmentsToBeRefreshed(carbonTable, updateStatusManager, validSegments.toList.asJava)
+          .getSegmentsToBeRefreshed(carbonTable, validSegments.toList.asJava)
         val indexServerLoadEvent: IndexServerLoadEvent =
           IndexServerLoadEvent(
             sparkSession,
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 5c2cc1b..7ff5cc0 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -371,24 +371,26 @@ class CarbonMergerRDD[K, V](
         .map(_.asInstanceOf[CarbonInputSplit])
         .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
     }
-    val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { entry =>
-      val segmentId = Segment.toSegment(entry.getSegmentId).getSegmentNo
-      val blockInfo = new TableBlockInfo(entry.getFilePath,
-        entry.getStart, entry.getSegmentId,
-        entry.getLocations, entry.getLength, entry.getVersion,
+    val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { inputSplit =>
+      val segmentId = Segment.toSegment(inputSplit.getSegmentId).getSegmentNo
+      val blockInfo = new TableBlockInfo(inputSplit.getFilePath,
+        inputSplit.getStart, inputSplit.getSegmentId,
+        inputSplit.getLocations, inputSplit.getLength, inputSplit.getVersion,
         updateStatusManager.getDeleteDeltaFilePath(
-          entry.getFilePath,
+          inputSplit.getFilePath,
           segmentId)
       )
       if (updateStatusManager.getUpdateStatusDetails.length != 0) {
-        updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId)
+        updateDetails = SegmentUpdateStatusManager.getInvalidTimestampRange(inputSplit
+          .getSegment
+          .getLoadMetadataDetails)
       }
       // filter splits with V3 data file format
       // if split is updated, then check for if it is valid segment based on update details
       (!updated ||
        (updated && (!CarbonUtil.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
          updateDetails, updateStatusManager)))) &&
-      FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
+      FileFormat.COLUMNAR_V3.equals(inputSplit.getFileFormat)
     }
     if (rangeColumn != null) {
       totalTaskCount = totalTaskCount +
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
index 16a2091..b75380c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
@@ -337,7 +337,7 @@ object BroadCastSIFilterPushJoin {
       if (CarbonProperties.getInstance
         .isDistributedPruningEnabled(carbonTable.getDatabaseName, carbonTable.getTableName)) {
         val segmentsToBeRefreshed: util.List[String] = DataMapStoreManager.getInstance
-          .getSegmentsToBeRefreshed(carbonTable, updateStatusManager, validSegmentsToAccess)
+          .getSegmentsToBeRefreshed(carbonTable, validSegmentsToAccess)
         try {
           val dataMapFormat: IndexInputFormat =
             new IndexInputFormat(carbonTable,