You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/05/03 21:55:36 UTC

[pinot] branch master updated: Allow custom lineage update and gc (#10679)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ffd804e4d Allow custom lineage update and gc (#10679)
1ffd804e4d is described below

commit 1ffd804e4da35ac14440a648316213b18b98f41c
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Wed May 3 14:55:25 2023 -0700

    Allow custom lineage update and gc (#10679)
---
 .../pinot/common/lineage/SegmentLineage.java       |  32 ++++-
 .../resources/EndReplaceSegmentsRequest.java       |  16 ++-
 ...uest.java => RevertReplaceSegmentsRequest.java} |  24 ++--
 .../resources/StartReplaceSegmentsRequest.java     |  11 +-
 .../apache/pinot/controller/ControllerConf.java    |  11 ++
 .../PinotSegmentUploadDownloadRestletResource.java |  18 +--
 .../helix/core/PinotHelixResourceManager.java      |  82 ++++++++++---
 .../helix/core/lineage/DefaultLineageManager.java  | 130 +++++++++++++++++++++
 .../helix/core/lineage/LineageManager.java         |  73 ++++++++++++
 .../helix/core/lineage/LineageManagerFactory.java  |  45 +++++++
 .../helix/core/retention/RetentionManager.java     |  70 +----------
 .../api/PinotSegmentRestletResourceTest.java       |   5 +-
 .../PinotHelixResourceManagerStatelessTest.java    |  62 +++++-----
 .../BaseMultipleSegmentsConversionExecutor.java    |   2 +-
 .../local/utils/ConsistentDataPushUtils.java       |   2 +-
 15 files changed, 440 insertions(+), 143 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
index 353d71ad5a..b3776ddc9f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -46,18 +47,22 @@ import org.apache.pinot.spi.utils.JsonUtils;
  */
 public class SegmentLineage {
   private static final String COMMA_SEPARATOR = ",";
+  private static final String CUSTOM_MAP_KEY = "custom.map";
 
   private final String _tableNameWithType;
   private final Map<String, LineageEntry> _lineageEntries;
+  private Map<String, String> _customMap = null;
 
   public SegmentLineage(String tableNameWithType) {
     _tableNameWithType = tableNameWithType;
     _lineageEntries = new HashMap<>();
   }
 
-  public SegmentLineage(String tableNameWithType, Map<String, LineageEntry> lineageEntries) {
+  public SegmentLineage(String tableNameWithType, Map<String, LineageEntry> lineageEntries,
+      @Nullable Map<String, String> customMap) {
     _tableNameWithType = tableNameWithType;
     _lineageEntries = lineageEntries;
+    _customMap = customMap;
   }
 
   public String getTableNameWithType() {
@@ -115,6 +120,22 @@ public class SegmentLineage {
     _lineageEntries.remove(lineageEntryId);
   }
 
+  /**
+   * Retrieve custom map
+   * @return custom map
+   */
+  public Map<String, String> getCustomMap() {
+    return _customMap;
+  }
+
+  /**
+   * Set custom map
+   * @param customMap
+   */
+  public void setCustomMap(Map<String, String> customMap) {
+    _customMap = customMap;
+  }
+
   /**
    * Convert ZNRecord to segment lineage
    * @param record ZNRecord representation of the segment lineage
@@ -124,6 +145,7 @@ public class SegmentLineage {
     String tableNameWithType = record.getId();
     Map<String, LineageEntry> lineageEntries = new HashMap<>();
     Map<String, List<String>> listFields = record.getListFields();
+    Map<String, String> customMap = record.getMapField(CUSTOM_MAP_KEY);
     for (Map.Entry<String, List<String>> listField : listFields.entrySet()) {
       String lineageId = listField.getKey();
       List<String> value = listField.getValue();
@@ -134,7 +156,7 @@ public class SegmentLineage {
       long timestamp = Long.parseLong(value.get(3));
       lineageEntries.put(lineageId, new LineageEntry(segmentsFrom, segmentsTo, state, timestamp));
     }
-    return new SegmentLineage(tableNameWithType, lineageEntries);
+    return new SegmentLineage(tableNameWithType, lineageEntries, customMap);
   }
 
   /**
@@ -152,6 +174,9 @@ public class SegmentLineage {
       List<String> listEntry = Arrays.asList(segmentsFrom, segmentsTo, state, timestamp);
       znRecord.setListField(entry.getKey(), listEntry);
     }
+    if (_customMap != null) {
+      znRecord.setMapField(CUSTOM_MAP_KEY, _customMap);
+    }
     return znRecord;
   }
 
@@ -167,6 +192,9 @@ public class SegmentLineage {
         .sorted(Map.Entry.comparingByValue(Comparator.comparingLong(LineageEntry::getTimestamp)))
         .forEachOrdered(x -> sortedLineageEntries.put(x.getKey(), x.getValue()));
     jsonObject.set("lineageEntries", JsonUtils.objectToJsonNode(sortedLineageEntries));
+    if (_customMap != null) {
+      jsonObject.set("customMap", JsonUtils.objectToJsonNode(_customMap));
+    }
     return jsonObject;
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java
index 166c35ab30..5247b41edf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java
@@ -21,21 +21,33 @@ package org.apache.pinot.common.restlet.resources;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 
 /**
- * segmentsTo: The new segments that actually get created. Sometimes not all segments that are passed into
+ * Request object for endReplaceSegments API.
+ *
+ * 1. segmentsTo: The new segments that actually get created. Sometimes not all segments that are passed into
  * startReplaceSegments can get created. If only a subset of the original list eventually gets created,
  * we need to be able to supply that list to the replacement protocol, so that the remaining
  * segments that did not get created can be ignored.
+ * 2. customMap : custom map.
  */
 public class EndReplaceSegmentsRequest {
   private final List<String> _segmentsTo;
+  private final Map<String, String> _customMap;
 
-  public EndReplaceSegmentsRequest(@JsonProperty("segmentsTo") @Nullable List<String> segmentsTo) {
+  public EndReplaceSegmentsRequest(@JsonProperty("segmentsTo") @Nullable List<String> segmentsTo,
+      @JsonProperty("customMap") @Nullable Map<String, String> customMap) {
     _segmentsTo = (segmentsTo == null) ? Collections.emptyList() : segmentsTo;
+    _customMap = customMap;
   }
+
   public List<String> getSegmentsTo() {
     return _segmentsTo;
   }
+
+  public Map<String, String> getCustomMap() {
+    return _customMap;
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/RevertReplaceSegmentsRequest.java
similarity index 56%
copy from pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java
copy to pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/RevertReplaceSegmentsRequest.java
index 166c35ab30..b4ed79aaf2 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/RevertReplaceSegmentsRequest.java
@@ -19,23 +19,23 @@
 package org.apache.pinot.common.restlet.resources;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.Collections;
-import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 
+
 /**
- * segmentsTo: The new segments that actually get created. Sometimes not all segments that are passed into
- * startReplaceSegments can get created. If only a subset of the original list eventually gets created,
- * we need to be able to supply that list to the replacement protocol, so that the remaining
- * segments that did not get created can be ignored.
+ * Request object for revertReplaceSegments API.
+ *
+ * customMap : custom map.
  */
-public class EndReplaceSegmentsRequest {
-  private final List<String> _segmentsTo;
+public class RevertReplaceSegmentsRequest {
+  private final Map<String, String> _customMap;
 
-  public EndReplaceSegmentsRequest(@JsonProperty("segmentsTo") @Nullable List<String> segmentsTo) {
-    _segmentsTo = (segmentsTo == null) ? Collections.emptyList() : segmentsTo;
+  public RevertReplaceSegmentsRequest(@JsonProperty("customMap") @Nullable Map<String, String> customMap) {
+    _customMap = customMap;
   }
-  public List<String> getSegmentsTo() {
-    return _segmentsTo;
+
+  public Map<String, String> getCustomMap() {
+    return _customMap;
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java
index 933eff4478..523fdb9b26 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 
 
@@ -31,17 +32,21 @@ import javax.annotation.Nullable;
  * 1. segmentsFrom : original segments. This field can be empty in case the user tries to upload the original segments
  *    and wants to achieve the atomic update of multiple segments.
  * 2. segmentsTo : merged segments.
+ * 3. customMap : custom map.
  */
 public class StartReplaceSegmentsRequest {
   private final List<String> _segmentsFrom;
   private final List<String> _segmentsTo;
+  private final Map<String, String> _customMap;
 
   public StartReplaceSegmentsRequest(@JsonProperty("segmentsFrom") @Nullable List<String> segmentsFrom,
-      @JsonProperty("segmentsTo") @Nullable List<String> segmentsTo) {
+      @JsonProperty("segmentsTo") @Nullable List<String> segmentsTo,
+      @JsonProperty("customMap") @Nullable Map<String, String> customMap) {
     _segmentsFrom = (segmentsFrom == null) ? Collections.emptyList() : segmentsFrom;
     _segmentsTo = (segmentsTo == null) ? Collections.emptyList() : segmentsTo;
     Preconditions.checkArgument(!_segmentsFrom.isEmpty() || !_segmentsTo.isEmpty(),
         "'segmentsFrom' and 'segmentsTo' cannot both be empty");
+    _customMap = customMap;
   }
 
   public List<String> getSegmentsFrom() {
@@ -51,4 +56,8 @@ public class StartReplaceSegmentsRequest {
   public List<String> getSegmentsTo() {
     return _segmentsTo;
   }
+
+  public Map<String, String> getCustomMap() {
+    return _customMap;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 8a07fe25be..61141aaa31 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -258,6 +258,7 @@ public class ControllerConf extends PinotConfiguration {
   public static final String ACCESS_CONTROL_FACTORY_CLASS = "controller.admin.access.control.factory.class";
   public static final String ACCESS_CONTROL_USERNAME = "access.control.init.username";
   public static final String ACCESS_CONTROL_PASSWORD = "access.control.init.password";
+  public static final String LINEAGE_MANAGER_CLASS = "controller.lineage.manager.class";
   // Amount of the time the segment can take from the beginning of upload to the end of upload. Used when parallel push
   // protection is enabled. If the upload does not finish within the timeout, next upload can override the previous one.
   private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis";
@@ -285,6 +286,8 @@ public class ControllerConf extends PinotConfiguration {
       "org.apache.pinot.controller.api.access.AllowAllAccessFactory";
   private static final String DEFAULT_ACCESS_CONTROL_USERNAME = "admin";
   private static final String DEFAULT_ACCESS_CONTROL_PASSWORD = "admin";
+  private static final String DEFAULT_LINEAGE_MANAGER =
+      "org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager";
   private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 600_000L; // 10 minutes
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
@@ -833,6 +836,14 @@ public class ControllerConf extends PinotConfiguration {
     setProperty(ACCESS_CONTROL_FACTORY_CLASS, accessControlFactoryClass);
   }
 
+  public String getLineageManagerClass() {
+    return getProperty(LINEAGE_MANAGER_CLASS, DEFAULT_LINEAGE_MANAGER);
+  }
+
+  public void setLineageManagerClass(String lineageModifierClass) {
+    setProperty(LINEAGE_MANAGER_CLASS, lineageModifierClass);
+  }
+
   public long getSegmentUploadTimeoutInMillis() {
     return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index ab02733473..31ca292f6b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -70,6 +70,7 @@ import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
+import org.apache.pinot.common.restlet.resources.RevertReplaceSegmentsRequest;
 import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.URIUtils;
@@ -630,7 +631,8 @@ public class PinotSegmentUploadDownloadRestletResource {
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
     try {
       String segmentLineageEntryId = _pinotHelixResourceManager.startReplaceSegments(tableNameWithType,
-          startReplaceSegmentsRequest.getSegmentsFrom(), startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup);
+          startReplaceSegmentsRequest.getSegmentsFrom(), startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup,
+          startReplaceSegmentsRequest.getCustomMap());
       return Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", segmentLineageEntryId)).build();
     } catch (Exception e) {
       _controllerMetrics.addMeteredTableValue(tableNameWithType, ControllerMeter.NUMBER_START_REPLACE_FAILURE, 1);
@@ -646,11 +648,10 @@ public class PinotSegmentUploadDownloadRestletResource {
   public Response endReplaceSegments(
       @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
-      @ApiParam(value = "Fields belonging to end replace segment request")
-        EndReplaceSegmentsRequest endReplaceSegmentsRequest,
       @ApiParam(value = "Segment lineage entry id returned by startReplaceSegments API", required = true)
-      @QueryParam("segmentLineageEntryId") String segmentLineageEntryId
-      ) {
+      @QueryParam("segmentLineageEntryId") String segmentLineageEntryId,
+      @ApiParam(value = "Fields belonging to end replace segment request", required = false)
+          EndReplaceSegmentsRequest endReplaceSegmentsRequest) {
     TableType tableType = Constants.validateTableType(tableTypeStr);
     if (tableType == null) {
       throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
@@ -681,7 +682,9 @@ public class PinotSegmentUploadDownloadRestletResource {
       @ApiParam(value = "Segment lineage entry id to revert", required = true) @QueryParam("segmentLineageEntryId")
           String segmentLineageEntryId,
       @ApiParam(value = "Force revert in case the user knows that the lineage entry is interrupted")
-      @QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert) {
+      @QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert,
+      @ApiParam(value = "Fields belonging to revert replace segment request", required = false)
+          RevertReplaceSegmentsRequest revertReplaceSegmentsRequest) {
     TableType tableType = Constants.validateTableType(tableTypeStr);
     if (tableType == null) {
       throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
@@ -692,7 +695,8 @@ public class PinotSegmentUploadDownloadRestletResource {
     try {
       // Check that the segment lineage entry id is valid
       Preconditions.checkNotNull(segmentLineageEntryId, "'segmentLineageEntryId' should not be null");
-      _pinotHelixResourceManager.revertReplaceSegments(tableNameWithType, segmentLineageEntryId, forceRevert);
+      _pinotHelixResourceManager.revertReplaceSegments(tableNameWithType, segmentLineageEntryId, forceRevert,
+          revertReplaceSegmentsRequest);
       return Response.ok().build();
     } catch (Exception e) {
       _controllerMetrics.addMeteredTableValue(tableNameWithType, ControllerMeter.NUMBER_REVERT_REPLACE_FAILURE, 1);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5610a19678..5a7e29a829 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -112,6 +112,7 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
 import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
+import org.apache.pinot.common.restlet.resources.RevertReplaceSegmentsRequest;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.tier.TierSegmentSelector;
 import org.apache.pinot.common.utils.BcryptUtils;
@@ -135,6 +136,8 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.controller.helix.core.lineage.LineageManager;
+import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
@@ -188,6 +191,10 @@ public class PinotHelixResourceManager {
   private static final int DEFAULT_TABLE_UPDATER_LOCKERS_SIZE = 100;
   private static final String API_REQUEST_ID_PREFIX = "api-";
 
+  private enum LineageUpdateType {
+    START, END, REVERT
+  }
+
   // TODO: make this configurable
   public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 60_000L; // 10 minutes
   public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 second
@@ -219,10 +226,11 @@ public class PinotHelixResourceManager {
   private SegmentDeletionManager _segmentDeletionManager;
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private TableCache _tableCache;
+  private final LineageManager _lineageManager;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir,
       boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean allowHLCTables,
-      int deletedSegmentsRetentionInDays) {
+      int deletedSegmentsRetentionInDays, LineageManager lineageManager) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _dataDir = dataDir;
@@ -244,12 +252,14 @@ public class PinotHelixResourceManager {
     for (int i = 0; i < _tableUpdaterLocks.length; i++) {
       _tableUpdaterLocks[i] = new Object();
     }
+    _lineageManager = lineageManager;
   }
 
   public PinotHelixResourceManager(ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(),
         controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
-        controllerConf.getHLCTablesAllowed(), controllerConf.getDeletedSegmentsRetentionInDays());
+        controllerConf.getHLCTablesAllowed(), controllerConf.getDeletedSegmentsRetentionInDays(),
+        LineageManagerFactory.create(controllerConf));
   }
 
   /**
@@ -367,6 +377,15 @@ public class PinotHelixResourceManager {
     return _propertyStore;
   }
 
+  /**
+   * Get the linage manager.
+   *
+   * @return lineage manager
+   */
+  public LineageManager getLineageManager() {
+    return _lineageManager;
+  }
+
   /**
    * Instance related APIs
    */
@@ -2209,8 +2228,8 @@ public class PinotHelixResourceManager {
       tasks.put(jobId, jobMetadata);
       if (tasks.size() > CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
         tasks = tasks.entrySet().stream().sorted((v1, v2) -> Long.compare(
-                Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
-                Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
+            Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
+            Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
             .collect(Collectors.toList()).subList(0, CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
             .stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
       }
@@ -3379,12 +3398,13 @@ public class PinotHelixResourceManager {
    * @param segmentsFrom a list of segments to be merged
    * @param segmentsTo a list of merged segments
    * @param forceCleanup True for enabling the force segment cleanup
+   * @param customMap
    * @return Segment lineage entry id
    *
    * @throws InvalidConfigException
    */
   public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo,
-      boolean forceCleanup) {
+      boolean forceCleanup, Map<String, String> customMap) {
     // Create a segment lineage entry id
     String segmentLineageEntryId = SegmentLineageUtils.generateLineageEntryId();
 
@@ -3534,6 +3554,8 @@ public class PinotHelixResourceManager {
         segmentLineage.addLineageEntry(segmentLineageEntryId,
             new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
 
+        _lineageManager
+            .updateLineageForStartReplaceSegments(tableConfig, segmentLineageEntryId, customMap, segmentLineage);
         // Write back to the lineage entry to the property store
         if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) {
           // Trigger the proactive segment clean up if needed. Once the lineage is updated in the property store, it
@@ -3570,9 +3592,9 @@ public class PinotHelixResourceManager {
    *
    * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
    * metadata.
-   *
    * @param tableNameWithType
    * @param segmentLineageEntryId
+   * @param endReplaceSegmentsRequest
    */
   public void endReplaceSegments(String tableNameWithType, String segmentLineageEntryId,
       @Nullable EndReplaceSegmentsRequest endReplaceSegmentsRequest) {
@@ -3630,8 +3652,11 @@ public class PinotHelixResourceManager {
             new LineageEntry(lineageEntry.getSegmentsFrom(), segmentsTo, LineageEntryState.COMPLETED,
                 System.currentTimeMillis());
 
-        if (writeLineageEntryWithTightLoop(tableNameWithType, segmentLineageEntryId, lineageEntryToUpdate, lineageEntry,
-            _propertyStore)) {
+        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+        Map<String, String> customMap =
+            endReplaceSegmentsRequest == null ? null : endReplaceSegmentsRequest.getCustomMap();
+        if (writeLineageEntryWithTightLoop(tableConfig, segmentLineageEntryId, lineageEntryToUpdate, lineageEntry,
+            _propertyStore, LineageUpdateType.END, customMap)) {
           // If the segment lineage metadata is successfully updated, we need to trigger brokers to rebuild the
           // routing table because it is possible that there has been no EV change but the routing result may be
           // different after updating the lineage entry.
@@ -3671,11 +3696,12 @@ public class PinotHelixResourceManager {
    *
    * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
    * metadata.
-   *
    * @param tableNameWithType
    * @param segmentLineageEntryId
+   * @param revertReplaceSegmentsRequest
    */
-  public void revertReplaceSegments(String tableNameWithType, String segmentLineageEntryId, boolean forceRevert) {
+  public void revertReplaceSegments(String tableNameWithType, String segmentLineageEntryId, boolean forceRevert,
+      @Nullable RevertReplaceSegmentsRequest revertReplaceSegmentsRequest) {
     try {
       DEFAULT_RETRY_POLICY.attempt(() -> {
         // Fetch the segment lineage metadata
@@ -3739,8 +3765,11 @@ public class PinotHelixResourceManager {
             new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
                 System.currentTimeMillis());
 
-        if (writeLineageEntryWithTightLoop(tableNameWithType, segmentLineageEntryId, lineageEntryToUpdate, lineageEntry,
-            _propertyStore)) {
+        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+        Map<String, String> customMap =
+            revertReplaceSegmentsRequest == null ? null : revertReplaceSegmentsRequest.getCustomMap();
+        if (writeLineageEntryWithTightLoop(tableConfig, segmentLineageEntryId, lineageEntryToUpdate, lineageEntry,
+            _propertyStore, LineageUpdateType.REVERT, customMap)) {
           // If the segment lineage metadata is successfully updated, we need to trigger brokers to rebuild the
           // routing table because it is possible that there has been no EV change but the routing result may be
           // different after updating the lineage entry.
@@ -3770,20 +3799,22 @@ public class PinotHelixResourceManager {
 
   /**
    * Update the lineage entry with the tight loop to increase the chance for successful ZK write.
-   *
-   * @param tableNameWithType table name with type
+   * @param tableConfig table config
    * @param lineageEntryId lineage entry id
    * @param lineageEntryToUpdate lineage entry that needs to be updated
    * @param lineageEntryToMatch lineage entry that needs to match with the entry from the newly fetched segment lineage.
    * @param propertyStore property store
+   * @param lineageUpdateType
+   * @param customMap
    */
-  private boolean writeLineageEntryWithTightLoop(String tableNameWithType, String lineageEntryId,
+  private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig, String lineageEntryId,
       LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+      ZkHelixPropertyStore<ZNRecord> propertyStore,
+      LineageUpdateType lineageUpdateType, Map<String, String> customMap) {
     for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) {
       // Fetch the segment lineage
       ZNRecord segmentLineageToUpdateZNRecord =
-          SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableNameWithType);
+          SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableConfig.getTableName());
       int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
       SegmentLineage segmentLineageToUpdate = SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord);
       LineageEntry currentLineageEntry = segmentLineageToUpdate.getLineageEntry(lineageEntryId);
@@ -3792,13 +3823,28 @@ public class PinotHelixResourceManager {
       if (!currentLineageEntry.equals(lineageEntryToMatch)) {
         String errorMsg = String.format(
             "Aborting the to update lineage entry since we find that the entry has been modified for table %s, "
-                + "entry id: %s", tableNameWithType, lineageEntryId);
+                + "entry id: %s", tableConfig.getTableName(), lineageEntryId);
         LOGGER.error(errorMsg);
         throw new RuntimeException(errorMsg);
       }
 
       // Update lineage entry
       segmentLineageToUpdate.updateLineageEntry(lineageEntryId, lineageEntryToUpdate);
+      switch (lineageUpdateType) {
+        case START:
+          _lineageManager
+              .updateLineageForStartReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate);
+          break;
+        case END:
+          _lineageManager
+              .updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate);
+          break;
+        case REVERT:
+          _lineageManager
+              .updateLineageForRevertReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate);
+          break;
+        default:
+      }
 
       // Write back to the lineage entry
       if (SegmentLineageAccessHelper.writeSegmentLineage(propertyStore, segmentLineageToUpdate, expectedVersion)) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
new file mode 100644
index 0000000000..225c65b717
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+
+
+public class DefaultLineageManager implements LineageManager {
+  private static final long REPLACED_SEGMENTS_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(1L); // 1 day
+  private static final long LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(1L); // 1 day
+
+  protected ControllerConf _controllerConf;
+
+  public DefaultLineageManager(ControllerConf controllerConf) {
+    _controllerConf = controllerConf;
+  }
+
+  @Override
+  public void updateLineageForStartReplaceSegments(TableConfig tableConfig, String lineageEntryId,
+      Map<String, String> customMap, SegmentLineage lineage) {
+  }
+
+  @Override
+  public void updateLineageForEndReplaceSegments(TableConfig tableConfig, String lineageEntryId,
+      Map<String, String> customMap, SegmentLineage lineage) {
+  }
+
+  @Override
+  public void updateLineageForRevertReplaceSegments(TableConfig tableConfig, String lineageEntryId,
+      Map<String, String> customMap, SegmentLineage lineage) {
+  }
+
+  /**
+   * This method:
+   * 1. Update lineage metadata by removing lineage entries
+   * 2. Find segments that need to be deleted
+   */
+  @Override
+  public void updateLineageForRetention(TableConfig tableConfig, SegmentLineage lineage, List<String> allSegments,
+      List<String> segmentsToDelete, Set<String> consumingSegments) {
+    // 1. The original segments can be deleted once the merged segments are successfully uploaded
+    // 2. The zombie lineage entry & merged segments should be deleted if the segment replacement failed in
+    //    the middle
+    Set<String> segmentsForTable = new HashSet<>(allSegments);
+    Iterator<LineageEntry> lineageEntryIterator = lineage.getLineageEntries().values().iterator();
+    while (lineageEntryIterator.hasNext()) {
+      LineageEntry lineageEntry = lineageEntryIterator.next();
+      if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+        Set<String> sourceSegments = new HashSet<>(lineageEntry.getSegmentsFrom());
+        sourceSegments.retainAll(segmentsForTable);
+        if (sourceSegments.isEmpty()) {
+          // If the lineage state is 'COMPLETED' and segmentFrom are removed, it is safe clean up the lineage entry
+          lineageEntryIterator.remove();
+        } else {
+          // If the lineage state is 'COMPLETED' and we already preserved the original segments for the required
+          // retention, it is safe to delete all segments from 'segmentsFrom'
+          if (shouldDeleteReplacedSegments(tableConfig, lineageEntry)) {
+            segmentsToDelete.addAll(sourceSegments);
+          }
+        }
+      } else if (lineageEntry.getState() == LineageEntryState.REVERTED || (
+          lineageEntry.getState() == LineageEntryState.IN_PROGRESS && lineageEntry.getTimestamp()
+              < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS)) {
+        // If the lineage state is 'IN_PROGRESS' or 'REVERTED', we need to clean up the zombie lineage
+        // entry and its segments
+        Set<String> destinationSegments = new HashSet<>(lineageEntry.getSegmentsTo());
+        destinationSegments.retainAll(segmentsForTable);
+        if (destinationSegments.isEmpty()) {
+          // If the lineage state is 'IN_PROGRESS or REVERTED' and source segments are already removed, it is safe
+          // to clean up the lineage entry. Deleting lineage will allow the task scheduler to re-schedule the source
+          // segments to be merged again.
+          lineageEntryIterator.remove();
+        } else {
+          // If the lineage state is 'IN_PROGRESS', it is safe to delete all segments from 'segmentsTo'
+          segmentsToDelete.addAll(destinationSegments);
+        }
+      }
+    }
+  }
+
+
+  /**
+   * Helper function to decide whether we should delete segmentsFrom (replaced segments) given a lineage entry.
+   *
+   * The replaced segments are safe to delete if the following conditions are all satisfied
+   * 1) Table is "APPEND"
+   * 2) It has been more than 24 hours since the lineage entry became "COMPLETED" state.
+   *
+   * @param tableConfig a table config
+   * @param lineageEntry lineage entry
+   * @return True if we can safely delete the replaced segments. False otherwise.
+   */
+  private boolean shouldDeleteReplacedSegments(TableConfig tableConfig, LineageEntry lineageEntry) {
+    // TODO: Currently, we preserve the replaced segments for 1 day for REFRESH tables only. Once we support
+    // data rollback for APPEND tables, we should remove this check.
+    String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
+    if (!batchSegmentIngestionType.equalsIgnoreCase("REFRESH")
+        || lineageEntry.getTimestamp() < System.currentTimeMillis() - REPLACED_SEGMENTS_RETENTION_IN_MILLIS) {
+      return true;
+    }
+    return false;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManager.java
new file mode 100644
index 0000000000..c829ef243b
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManager.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+/**
+ * Interface to update lineage metadata based on custom inputs and garbage collect lineage entries
+ */
+public interface LineageManager {
+
+  /**
+   * Update lineage based on customMap for start replace calls
+   * @param tableConfig
+   * @param lineageEntryId
+   * @param customMap
+   * @param lineage
+   */
+  void updateLineageForStartReplaceSegments(TableConfig tableConfig, String lineageEntryId,
+      Map<String, String> customMap, SegmentLineage lineage);
+
+  /**
+   * Update lineage based on customMap for end replace calls
+   * @param tableConfig
+   * @param lineageEntryId
+   * @param customMap
+   * @param lineage
+   */
+  void updateLineageForEndReplaceSegments(TableConfig tableConfig, String lineageEntryId, Map<String, String> customMap,
+      SegmentLineage lineage);
+
+  /**
+   * Update lineage based on customMap for revert replace calls
+   * @param tableConfig
+   * @param lineageEntryId
+   * @param customMap
+   * @param lineage
+   */
+  void updateLineageForRevertReplaceSegments(TableConfig tableConfig, String lineageEntryId,
+      Map<String, String> customMap, SegmentLineage lineage);
+
+  /**
+   * Update lineage for retention purposes
+   * @param tableConfig
+   * @param lineage
+   * @param allSegments
+   * @param segmentsToDelete
+   * @param consumingSegments
+   */
+  void updateLineageForRetention(TableConfig tableConfig, SegmentLineage lineage, List<String> allSegments,
+      List<String> segmentsToDelete, Set<String> consumingSegments);
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManagerFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManagerFactory.java
new file mode 100644
index 0000000000..1a07dfefac
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManagerFactory.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import org.apache.pinot.controller.ControllerConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Factory class to create {@link LineageManager} based on controller configs
+ */
+public class LineageManagerFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LineageManagerFactory.class);
+
+  private LineageManagerFactory() {
+  }
+
+  public static LineageManager create(ControllerConf controllerConf) {
+    String lineageManagerClassName = controllerConf.getLineageManagerClass();
+    try {
+      return (LineageManager) Class.forName(lineageManagerClassName).getConstructor(ControllerConf.class)
+          .newInstance(controllerConf);
+    } catch (Exception e) {
+      LOGGER.error("LineageManager not found: {}", lineageManagerClassName);
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 977733dbec..36443146e0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -20,15 +20,12 @@ package org.apache.pinot.controller.helix.core.retention;
 
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.lineage.LineageEntry;
-import org.apache.pinot.common.lineage.LineageEntryState;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -59,8 +56,6 @@ import org.slf4j.LoggerFactory;
  */
 public class RetentionManager extends ControllerPeriodicTask<Void> {
   public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(5L);
-  private static final long REPLACED_SEGMENTS_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(1L); // 1 day
-  public static final long LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(1L); // 1 day
   private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
 
   private static final Logger LOGGER = LoggerFactory.getLogger(RetentionManager.class);
@@ -210,46 +205,11 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
         SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
         int expectedVersion = segmentLineageZNRecord.getVersion();
 
-        // 1. The original segments can be deleted once the merged segments are successfully uploaded
-        // 2. The zombie lineage entry & merged segments should be deleted if the segment replacement failed in
-        //    the middle
-        Set<String> segmentsForTable =
-            new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false));
+        List<String> segmentsForTable = _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false);
         List<String> segmentsToDelete = new ArrayList<>();
-        Iterator<LineageEntry> lineageEntryIterator = segmentLineage.getLineageEntries().values().iterator();
-        while (lineageEntryIterator.hasNext()) {
-          LineageEntry lineageEntry = lineageEntryIterator.next();
-          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
-            Set<String> sourceSegments = new HashSet<>(lineageEntry.getSegmentsFrom());
-            sourceSegments.retainAll(segmentsForTable);
-            if (sourceSegments.isEmpty()) {
-              // If the lineage state is 'COMPLETED' and segmentFrom are removed, it is safe clean up the lineage entry
-              lineageEntryIterator.remove();
-            } else {
-              // If the lineage state is 'COMPLETED' and we already preserved the original segments for the required
-              // retention, it is safe to delete all segments from 'segmentsFrom'
-              if (shouldDeleteReplacedSegments(tableConfig, lineageEntry)) {
-                segmentsToDelete.addAll(sourceSegments);
-              }
-            }
-          } else if (lineageEntry.getState() == LineageEntryState.REVERTED || (
-              lineageEntry.getState() == LineageEntryState.IN_PROGRESS && lineageEntry.getTimestamp()
-                  < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS)) {
-            // If the lineage state is 'IN_PROGRESS' or 'REVERTED', we need to clean up the zombie lineage
-            // entry and its segments
-            Set<String> destinationSegments = new HashSet<>(lineageEntry.getSegmentsTo());
-            destinationSegments.retainAll(segmentsForTable);
-            if (destinationSegments.isEmpty()) {
-              // If the lineage state is 'IN_PROGRESS or REVERTED' and source segments are already removed, it is safe
-              // to clean up the lineage entry. Deleting lineage will allow the task scheduler to re-schedule the source
-              // segments to be merged again.
-              lineageEntryIterator.remove();
-            } else {
-              // If the lineage state is 'IN_PROGRESS', it is safe to delete all segments from 'segmentsTo'
-              segmentsToDelete.addAll(destinationSegments);
-            }
-          }
-        }
+        _pinotHelixResourceManager.getLineageManager()
+            .updateLineageForRetention(tableConfig, segmentLineage, segmentsForTable, segmentsToDelete,
+                _pinotHelixResourceManager.getConsumingSegments(tableNameWithType));
 
         // Write back to the lineage entry
         if (SegmentLineageAccessHelper.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
@@ -274,26 +234,4 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
     }
     LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
   }
-
-  /**
-   * Helper function to decide whether we should delete segmentsFrom (replaced segments) given a lineage entry.
-   *
-   * The replaced segments are safe to delete if the following conditions are all satisfied
-   * 1) Table is "APPEND"
-   * 2) It has been more than 24 hours since the lineage entry became "COMPLETED" state.
-   *
-   * @param tableConfig a table config
-   * @param lineageEntry lineage entry
-   * @return True if we can safely delete the replaced segments. False otherwise.
-   */
-  private boolean shouldDeleteReplacedSegments(TableConfig tableConfig, LineageEntry lineageEntry) {
-    // TODO: Currently, we preserve the replaced segments for 1 day for REFRESH tables only. Once we support
-    // data rollback for APPEND tables, we should remove this check.
-    String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
-    if (!batchSegmentIngestionType.equalsIgnoreCase("REFRESH")
-        || lineageEntry.getTimestamp() < System.currentTimeMillis() - REPLACED_SEGMENTS_RETENTION_IN_MILLIS) {
-      return true;
-    }
-    return false;
-  }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
index 181fe6ef44..5c8713136c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
@@ -81,13 +81,14 @@ public class PinotSegmentRestletResourceTest {
     // Now starts to replace segments.
     List<String> segmentsFrom = Arrays.asList("s0", "s1");
     List<String> segmentsTo = Collections.singletonList("some_segment");
-    String segmentLineageId = resourceManager.startReplaceSegments(offlineTableName, segmentsFrom, segmentsTo, false);
+    String segmentLineageId = resourceManager.startReplaceSegments(offlineTableName, segmentsFrom, segmentsTo, false,
+        null);
 
     // Replace more segments to add another entry to segment lineage.
     segmentsFrom = Arrays.asList("s2", "s3");
     segmentsTo = Collections.singletonList("another_segment");
     String nextSegmentLineageId =
-        resourceManager.startReplaceSegments(offlineTableName, segmentsFrom, segmentsTo, false);
+        resourceManager.startReplaceSegments(offlineTableName, segmentsFrom, segmentsTo, false, null);
 
     // There should now be two segment lineage entries resulting from the operations above.
     segmentLineageResponse =
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index d72571b1c5..562b011677 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -805,17 +805,17 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom = Collections.emptyList();
     List<String> segmentsTo = Arrays.asList("s20", "s21");
     String lineageEntryId =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom, segmentsTo, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom, segmentsTo, false, null);
     assertThrows(RuntimeException.class,
         () -> _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId,
-            new EndReplaceSegmentsRequest(Arrays.asList("s9", "s6"))));
+            new EndReplaceSegmentsRequest(Arrays.asList("s9", "s6"), null)));
     // Try after new segments added to the table
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
         SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s20"), "downloadUrl");
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
         SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s21"), "downloadUrl");
     _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId,
-        new EndReplaceSegmentsRequest(Arrays.asList("s21")));
+        new EndReplaceSegmentsRequest(Arrays.asList("s21"), null));
     SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds(), Collections.singleton(lineageEntryId));
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(), segmentsFrom);
@@ -847,7 +847,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom1 = Collections.emptyList();
     List<String> segmentsTo1 = Arrays.asList("s5", "s6");
     String lineageEntryId1 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom1, segmentsTo1, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom1, segmentsTo1, false, null);
     SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds(), Collections.singleton(lineageEntryId1));
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId1).getSegmentsFrom(), segmentsFrom1);
@@ -857,15 +857,15 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     // Check invalid segmentsTo
     assertThrows(IllegalStateException.class,
         () -> _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, Arrays.asList("s1", "s2"),
-            Arrays.asList("s3", "s4"), false));
+            Arrays.asList("s3", "s4"), false, null));
     assertThrows(IllegalStateException.class,
         () -> _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, Arrays.asList("s1", "s2"),
-            Collections.singletonList("s2"), false));
+            Collections.singletonList("s2"), false, null));
 
     // Check invalid segmentsFrom
     assertThrows(IllegalStateException.class,
         () -> _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, Arrays.asList("s1", "s6"),
-            Collections.singletonList("s7"), false));
+            Collections.singletonList("s7"), false, null));
 
     // Invalid table
     assertThrows(RuntimeException.class,
@@ -898,7 +898,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom2 = Arrays.asList("s1", "s2");
     List<String> segmentsTo2 = Arrays.asList("merged_t1_0", "merged_t1_1");
     String lineageEntryId2 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom2, segmentsTo2, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom2, segmentsTo2, false, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertSetEquals(segmentLineage.getLineageEntryIds(), lineageEntryId1, lineageEntryId2);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), segmentsFrom2);
@@ -914,10 +914,10 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
 
     // Revert the entry with partial data uploaded without forceRevert
     assertThrows(RuntimeException.class,
-        () -> _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId2, false));
+        () -> _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId2, false, null));
 
     // Revert the entry with partial data uploaded with forceRevert
-    _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId2, true);
+    _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId2, true, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.REVERTED);
 
@@ -930,7 +930,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom3 = Arrays.asList("s1", "s2");
     List<String> segmentsTo3 = Arrays.asList("merged_t2_0", "merged_t2_1");
     String lineageEntryId3 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom3, segmentsTo3, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom3, segmentsTo3, false, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertSetEquals(segmentLineage.getLineageEntryIds(), lineageEntryId1, lineageEntryId2, lineageEntryId3);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(), segmentsFrom3);
@@ -945,11 +945,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom4 = Arrays.asList("s1", "s2");
     List<String> segmentsTo4 = Arrays.asList("merged_t3_0", "merged_t3_1");
     assertThrows(RuntimeException.class,
-        () -> _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, false));
+        () -> _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, false, null));
 
     // Test force clean up case
     String lineageEntryId4 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, true);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, true, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertSetEquals(segmentLineage.getLineageEntryIds(), lineageEntryId1, lineageEntryId2, lineageEntryId3,
         lineageEntryId4);
@@ -985,7 +985,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom5 = Collections.emptyList();
     List<String> segmentsTo5 = Arrays.asList("s7", "s8");
     String lineageEntryId5 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom5, segmentsTo5, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom5, segmentsTo5, false, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertSetEquals(segmentLineage.getLineageEntryIds(), lineageEntryId1, lineageEntryId2, lineageEntryId3,
         lineageEntryId4, lineageEntryId5);
@@ -998,7 +998,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom6 = Collections.emptyList();
     List<String> segmentsTo6 = Arrays.asList("s7", "s8");
     String lineageEntryId6 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom6, segmentsTo6, true);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom6, segmentsTo6, true, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertSetEquals(segmentLineage.getLineageEntryIds(), lineageEntryId1, lineageEntryId2, lineageEntryId3,
         lineageEntryId4, lineageEntryId6);
@@ -1015,7 +1015,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom7 = Collections.emptyList();
     List<String> segmentsTo7 = Arrays.asList("s9", "s10");
     String lineageEntryId7 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom7, segmentsTo7, true);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom7, segmentsTo7, true, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), LineageEntryState.IN_PROGRESS);
@@ -1039,7 +1039,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom8 = Arrays.asList("s9", "s10");
     List<String> segmentsTo8 = Arrays.asList("s11", "s12");
     String lineageEntryId8 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom8, segmentsTo8, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom8, segmentsTo8, false, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds().size(), 7);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(), segmentsFrom8);
@@ -1055,7 +1055,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom9 = Arrays.asList("s0", "s9");
     List<String> segmentsTo9 = Arrays.asList("s13", "s14");
     String lineageEntryId9 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom9, segmentsTo9, true);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom9, segmentsTo9, true, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), LineageEntryState.REVERTED);
@@ -1081,7 +1081,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom10 = Arrays.asList("s13", "s14");
     List<String> segmentsTo10 = Collections.emptyList();
     String lineageEntryId10 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom10, segmentsTo10, true);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom10, segmentsTo10, true, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds().size(), 9);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getState(), LineageEntryState.IN_PROGRESS);
@@ -1121,7 +1121,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom1 = Arrays.asList("s0", "s1", "s2");
     List<String> segmentsTo1 = Arrays.asList("s3", "s4", "s5");
     String lineageEntryId1 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom1, segmentsTo1, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom1, segmentsTo1, false, null);
     SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds(), Collections.singleton(lineageEntryId1));
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId1).getSegmentsFrom(), segmentsFrom1);
@@ -1153,7 +1153,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom2 = Arrays.asList("s3", "s4", "s5");
     List<String> segmentsTo2 = Arrays.asList("s6", "s7", "s8");
     String lineageEntryId2 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom2, segmentsTo2, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom2, segmentsTo2, false, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), segmentsFrom2);
@@ -1164,7 +1164,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
 
     // Reverting the first entry should fail
     assertThrows(RuntimeException.class,
-        () -> _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId1, false));
+        () -> _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId1, false, null));
 
     // Add partial segments to indicate incomplete protocol
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
@@ -1180,7 +1180,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom3 = Arrays.asList("s3", "s4", "s5");
     List<String> segmentsTo3 = Arrays.asList("s9", "s10", "s11");
     String lineageEntryId3 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom3, segmentsTo3, true);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom3, segmentsTo3, true, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds().size(), 3);
 
@@ -1215,7 +1215,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
 
     // Call revert segment replacements (s3, s4, s5) <- (s9, s10, s11) to check if the revertReplaceSegments correctly
     // deleted (s9, s10, s11).
-    _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId3, false);
+    _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId3, false, null);
     assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), 3);
     assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true), "s3", "s4", "s5");
 
@@ -1231,7 +1231,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom4 = Arrays.asList("s3", "s4", "s5");
     List<String> segmentsTo4 = Arrays.asList("s12", "s13", "s14");
     String lineageEntryId4 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, true);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, true, null);
     assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false), "s3", "s4", "s5");
 
     // Upload the new segments (s12, s13, s14)
@@ -1251,7 +1251,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom5 = Collections.emptyList();
     List<String> segmentsTo5 = Arrays.asList("s15", "s16");
     String lineageEntryId5 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom5, segmentsTo5, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom5, segmentsTo5, false, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getSegmentsFrom(), segmentsFrom5);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getSegmentsTo(), segmentsTo5);
@@ -1266,7 +1266,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom6 = Collections.emptyList();
     List<String> segmentsTo6 = Arrays.asList("s17", "s18");
     String lineageEntryId6 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom6, segmentsTo6, true);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom6, segmentsTo6, true, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(), LineageEntryState.IN_PROGRESS);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), LineageEntryState.IN_PROGRESS);
@@ -1287,7 +1287,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom7 = Arrays.asList("s17", "s18");
     List<String> segmentsTo7 = Arrays.asList("s19", "s20");
     String lineageEntryId7 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom7, segmentsTo7, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom7, segmentsTo7, false, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsFrom(), segmentsFrom7);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsTo(), segmentsTo7);
@@ -1302,7 +1302,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom8 = Arrays.asList("s14", "s17");
     List<String> segmentsTo8 = Arrays.asList("s21", "s22");
     String lineageEntryId8 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom8, segmentsTo8, true);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom8, segmentsTo8, true, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), LineageEntryState.REVERTED);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), LineageEntryState.IN_PROGRESS);
@@ -1324,7 +1324,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom9 = Arrays.asList("s21", "s22");
     List<String> segmentsTo9 = Arrays.asList("s23", "s24");
     String lineageEntryId9 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom9, segmentsTo9, false);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom9, segmentsTo9, false, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(), segmentsFrom9);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(), segmentsTo9);
@@ -1341,7 +1341,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     List<String> segmentsFrom10 = Arrays.asList("s21", "s22");
     List<String> segmentsTo10 = Arrays.asList("s24", "s25");
     String lineageEntryId10 =
-        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom10, segmentsTo10, true);
+        _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom10, segmentsTo10, true, null);
     segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(), segmentsFrom9);
     assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(), Collections.singletonList("s23"));
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 99b1341f6d..5842d4f672 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -148,7 +148,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
               .collect(Collectors.toList());
       String lineageEntryId =
           SegmentConversionUtils.startSegmentReplace(context.getTableNameWithType(), context.getUploadURL(),
-              new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), context.getAuthProvider());
+              new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo, null), context.getAuthProvider());
       context.setCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID, lineageEntryId);
     }
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
index 7296cc7928..fda44e2139 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
@@ -120,7 +120,7 @@ public class ConsistentDataPushUtils {
       List<String> segmentsFrom = uriToSegmentsFrom.get(controllerUri);
 
       StartReplaceSegmentsRequest startReplaceSegmentsRequest =
-          new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo);
+          new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo, null);
       DEFAULT_RETRY_POLICY.attempt(() -> {
         try {
           SimpleHttpResponse response =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org