You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/05/03 21:55:36 UTC
[pinot] branch master updated: Allow custom lineage update and gc (#10679)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1ffd804e4d Allow custom lineage update and gc (#10679)
1ffd804e4d is described below
commit 1ffd804e4da35ac14440a648316213b18b98f41c
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Wed May 3 14:55:25 2023 -0700
Allow custom lineage update and gc (#10679)
---
.../pinot/common/lineage/SegmentLineage.java | 32 ++++-
.../resources/EndReplaceSegmentsRequest.java | 16 ++-
...uest.java => RevertReplaceSegmentsRequest.java} | 24 ++--
.../resources/StartReplaceSegmentsRequest.java | 11 +-
.../apache/pinot/controller/ControllerConf.java | 11 ++
.../PinotSegmentUploadDownloadRestletResource.java | 18 +--
.../helix/core/PinotHelixResourceManager.java | 82 ++++++++++---
.../helix/core/lineage/DefaultLineageManager.java | 130 +++++++++++++++++++++
.../helix/core/lineage/LineageManager.java | 73 ++++++++++++
.../helix/core/lineage/LineageManagerFactory.java | 45 +++++++
.../helix/core/retention/RetentionManager.java | 70 +----------
.../api/PinotSegmentRestletResourceTest.java | 5 +-
.../PinotHelixResourceManagerStatelessTest.java | 62 +++++-----
.../BaseMultipleSegmentsConversionExecutor.java | 2 +-
.../local/utils/ConsistentDataPushUtils.java | 2 +-
15 files changed, 440 insertions(+), 143 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
index 353d71ad5a..b3776ddc9f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -46,18 +47,22 @@ import org.apache.pinot.spi.utils.JsonUtils;
*/
public class SegmentLineage {
private static final String COMMA_SEPARATOR = ",";
+ private static final String CUSTOM_MAP_KEY = "custom.map";
private final String _tableNameWithType;
private final Map<String, LineageEntry> _lineageEntries;
+ private Map<String, String> _customMap = null;
public SegmentLineage(String tableNameWithType) {
_tableNameWithType = tableNameWithType;
_lineageEntries = new HashMap<>();
}
- public SegmentLineage(String tableNameWithType, Map<String, LineageEntry> lineageEntries) {
+ public SegmentLineage(String tableNameWithType, Map<String, LineageEntry> lineageEntries,
+ @Nullable Map<String, String> customMap) {
_tableNameWithType = tableNameWithType;
_lineageEntries = lineageEntries;
+ _customMap = customMap;
}
public String getTableNameWithType() {
@@ -115,6 +120,22 @@ public class SegmentLineage {
_lineageEntries.remove(lineageEntryId);
}
+ /**
+ * Retrieve custom map
+ * @return custom map
+ */
+ public Map<String, String> getCustomMap() {
+ return _customMap;
+ }
+
+ /**
+ * Set custom map
+ * @param customMap
+ */
+ public void setCustomMap(Map<String, String> customMap) {
+ _customMap = customMap;
+ }
+
/**
* Convert ZNRecord to segment lineage
* @param record ZNRecord representation of the segment lineage
@@ -124,6 +145,7 @@ public class SegmentLineage {
String tableNameWithType = record.getId();
Map<String, LineageEntry> lineageEntries = new HashMap<>();
Map<String, List<String>> listFields = record.getListFields();
+ Map<String, String> customMap = record.getMapField(CUSTOM_MAP_KEY);
for (Map.Entry<String, List<String>> listField : listFields.entrySet()) {
String lineageId = listField.getKey();
List<String> value = listField.getValue();
@@ -134,7 +156,7 @@ public class SegmentLineage {
long timestamp = Long.parseLong(value.get(3));
lineageEntries.put(lineageId, new LineageEntry(segmentsFrom, segmentsTo, state, timestamp));
}
- return new SegmentLineage(tableNameWithType, lineageEntries);
+ return new SegmentLineage(tableNameWithType, lineageEntries, customMap);
}
/**
@@ -152,6 +174,9 @@ public class SegmentLineage {
List<String> listEntry = Arrays.asList(segmentsFrom, segmentsTo, state, timestamp);
znRecord.setListField(entry.getKey(), listEntry);
}
+ if (_customMap != null) {
+ znRecord.setMapField(CUSTOM_MAP_KEY, _customMap);
+ }
return znRecord;
}
@@ -167,6 +192,9 @@ public class SegmentLineage {
.sorted(Map.Entry.comparingByValue(Comparator.comparingLong(LineageEntry::getTimestamp)))
.forEachOrdered(x -> sortedLineageEntries.put(x.getKey(), x.getValue()));
jsonObject.set("lineageEntries", JsonUtils.objectToJsonNode(sortedLineageEntries));
+ if (_customMap != null) {
+ jsonObject.set("customMap", JsonUtils.objectToJsonNode(_customMap));
+ }
return jsonObject;
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java
index 166c35ab30..5247b41edf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java
@@ -21,21 +21,33 @@ package org.apache.pinot.common.restlet.resources;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
/**
- * segmentsTo: The new segments that actually get created. Sometimes not all segments that are passed into
+ * Request object for endReplaceSegments API.
+ *
+ * 1. segmentsTo: The new segments that actually get created. Sometimes not all segments that are passed into
* startReplaceSegments can get created. If only a subset of the original list eventually gets created,
* we need to be able to supply that list to the replacement protocol, so that the remaining
* segments that did not get created can be ignored.
+ * 2. customMap : custom map.
*/
public class EndReplaceSegmentsRequest {
private final List<String> _segmentsTo;
+ private final Map<String, String> _customMap;
- public EndReplaceSegmentsRequest(@JsonProperty("segmentsTo") @Nullable List<String> segmentsTo) {
+ public EndReplaceSegmentsRequest(@JsonProperty("segmentsTo") @Nullable List<String> segmentsTo,
+ @JsonProperty("customMap") @Nullable Map<String, String> customMap) {
_segmentsTo = (segmentsTo == null) ? Collections.emptyList() : segmentsTo;
+ _customMap = customMap;
}
+
public List<String> getSegmentsTo() {
return _segmentsTo;
}
+
+ public Map<String, String> getCustomMap() {
+ return _customMap;
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/RevertReplaceSegmentsRequest.java
similarity index 56%
copy from pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java
copy to pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/RevertReplaceSegmentsRequest.java
index 166c35ab30..b4ed79aaf2 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/EndReplaceSegmentsRequest.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/RevertReplaceSegmentsRequest.java
@@ -19,23 +19,23 @@
package org.apache.pinot.common.restlet.resources;
import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.Collections;
-import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
+
/**
- * segmentsTo: The new segments that actually get created. Sometimes not all segments that are passed into
- * startReplaceSegments can get created. If only a subset of the original list eventually gets created,
- * we need to be able to supply that list to the replacement protocol, so that the remaining
- * segments that did not get created can be ignored.
+ * Request object for revertReplaceSegments API.
+ *
+ * customMap : custom map.
*/
-public class EndReplaceSegmentsRequest {
- private final List<String> _segmentsTo;
+public class RevertReplaceSegmentsRequest {
+ private final Map<String, String> _customMap;
- public EndReplaceSegmentsRequest(@JsonProperty("segmentsTo") @Nullable List<String> segmentsTo) {
- _segmentsTo = (segmentsTo == null) ? Collections.emptyList() : segmentsTo;
+ public RevertReplaceSegmentsRequest(@JsonProperty("customMap") @Nullable Map<String, String> customMap) {
+ _customMap = customMap;
}
- public List<String> getSegmentsTo() {
- return _segmentsTo;
+
+ public Map<String, String> getCustomMap() {
+ return _customMap;
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java
index 933eff4478..523fdb9b26 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
@@ -31,17 +32,21 @@ import javax.annotation.Nullable;
* 1. segmentsFrom : original segments. This field can be empty in case the user tries to upload the original segments
* and wants to achieve the atomic update of multiple segments.
* 2. segmentsTo : merged segments.
+ * 3. customMap : custom map.
*/
public class StartReplaceSegmentsRequest {
private final List<String> _segmentsFrom;
private final List<String> _segmentsTo;
+ private final Map<String, String> _customMap;
public StartReplaceSegmentsRequest(@JsonProperty("segmentsFrom") @Nullable List<String> segmentsFrom,
- @JsonProperty("segmentsTo") @Nullable List<String> segmentsTo) {
+ @JsonProperty("segmentsTo") @Nullable List<String> segmentsTo,
+ @JsonProperty("customMap") @Nullable Map<String, String> customMap) {
_segmentsFrom = (segmentsFrom == null) ? Collections.emptyList() : segmentsFrom;
_segmentsTo = (segmentsTo == null) ? Collections.emptyList() : segmentsTo;
Preconditions.checkArgument(!_segmentsFrom.isEmpty() || !_segmentsTo.isEmpty(),
"'segmentsFrom' and 'segmentsTo' cannot both be empty");
+ _customMap = customMap;
}
public List<String> getSegmentsFrom() {
@@ -51,4 +56,8 @@ public class StartReplaceSegmentsRequest {
public List<String> getSegmentsTo() {
return _segmentsTo;
}
+
+ public Map<String, String> getCustomMap() {
+ return _customMap;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 8a07fe25be..61141aaa31 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -258,6 +258,7 @@ public class ControllerConf extends PinotConfiguration {
public static final String ACCESS_CONTROL_FACTORY_CLASS = "controller.admin.access.control.factory.class";
public static final String ACCESS_CONTROL_USERNAME = "access.control.init.username";
public static final String ACCESS_CONTROL_PASSWORD = "access.control.init.password";
+ public static final String LINEAGE_MANAGER_CLASS = "controller.lineage.manager.class";
// Amount of the time the segment can take from the beginning of upload to the end of upload. Used when parallel push
// protection is enabled. If the upload does not finish within the timeout, next upload can override the previous one.
private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis";
@@ -285,6 +286,8 @@ public class ControllerConf extends PinotConfiguration {
"org.apache.pinot.controller.api.access.AllowAllAccessFactory";
private static final String DEFAULT_ACCESS_CONTROL_USERNAME = "admin";
private static final String DEFAULT_ACCESS_CONTROL_PASSWORD = "admin";
+ private static final String DEFAULT_LINEAGE_MANAGER =
+ "org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager";
private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 600_000L; // 10 minutes
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
@@ -833,6 +836,14 @@ public class ControllerConf extends PinotConfiguration {
setProperty(ACCESS_CONTROL_FACTORY_CLASS, accessControlFactoryClass);
}
+ public String getLineageManagerClass() {
+ return getProperty(LINEAGE_MANAGER_CLASS, DEFAULT_LINEAGE_MANAGER);
+ }
+
+ public void setLineageManagerClass(String lineageModifierClass) {
+ setProperty(LINEAGE_MANAGER_CLASS, lineageModifierClass);
+ }
+
public long getSegmentUploadTimeoutInMillis() {
return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index ab02733473..31ca292f6b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -70,6 +70,7 @@ import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
+import org.apache.pinot.common.restlet.resources.RevertReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.URIUtils;
@@ -630,7 +631,8 @@ public class PinotSegmentUploadDownloadRestletResource {
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
try {
String segmentLineageEntryId = _pinotHelixResourceManager.startReplaceSegments(tableNameWithType,
- startReplaceSegmentsRequest.getSegmentsFrom(), startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup);
+ startReplaceSegmentsRequest.getSegmentsFrom(), startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup,
+ startReplaceSegmentsRequest.getCustomMap());
return Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", segmentLineageEntryId)).build();
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(tableNameWithType, ControllerMeter.NUMBER_START_REPLACE_FAILURE, 1);
@@ -646,11 +648,10 @@ public class PinotSegmentUploadDownloadRestletResource {
public Response endReplaceSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
- @ApiParam(value = "Fields belonging to end replace segment request")
- EndReplaceSegmentsRequest endReplaceSegmentsRequest,
@ApiParam(value = "Segment lineage entry id returned by startReplaceSegments API", required = true)
- @QueryParam("segmentLineageEntryId") String segmentLineageEntryId
- ) {
+ @QueryParam("segmentLineageEntryId") String segmentLineageEntryId,
+ @ApiParam(value = "Fields belonging to end replace segment request", required = false)
+ EndReplaceSegmentsRequest endReplaceSegmentsRequest) {
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
@@ -681,7 +682,9 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiParam(value = "Segment lineage entry id to revert", required = true) @QueryParam("segmentLineageEntryId")
String segmentLineageEntryId,
@ApiParam(value = "Force revert in case the user knows that the lineage entry is interrupted")
- @QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert) {
+ @QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert,
+ @ApiParam(value = "Fields belonging to revert replace segment request", required = false)
+ RevertReplaceSegmentsRequest revertReplaceSegmentsRequest) {
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
@@ -692,7 +695,8 @@ public class PinotSegmentUploadDownloadRestletResource {
try {
// Check that the segment lineage entry id is valid
Preconditions.checkNotNull(segmentLineageEntryId, "'segmentLineageEntryId' should not be null");
- _pinotHelixResourceManager.revertReplaceSegments(tableNameWithType, segmentLineageEntryId, forceRevert);
+ _pinotHelixResourceManager.revertReplaceSegments(tableNameWithType, segmentLineageEntryId, forceRevert,
+ revertReplaceSegmentsRequest);
return Response.ok().build();
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(tableNameWithType, ControllerMeter.NUMBER_REVERT_REPLACE_FAILURE, 1);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5610a19678..5a7e29a829 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -112,6 +112,7 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
+import org.apache.pinot.common.restlet.resources.RevertReplaceSegmentsRequest;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierSegmentSelector;
import org.apache.pinot.common.utils.BcryptUtils;
@@ -135,6 +136,8 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.controller.helix.core.lineage.LineageManager;
+import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
@@ -188,6 +191,10 @@ public class PinotHelixResourceManager {
private static final int DEFAULT_TABLE_UPDATER_LOCKERS_SIZE = 100;
private static final String API_REQUEST_ID_PREFIX = "api-";
+ private enum LineageUpdateType {
+ START, END, REVERT
+ }
+
// TODO: make this configurable
public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 60_000L; // 10 minutes
public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 second
@@ -219,10 +226,11 @@ public class PinotHelixResourceManager {
private SegmentDeletionManager _segmentDeletionManager;
private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
private TableCache _tableCache;
+ private final LineageManager _lineageManager;
public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir,
boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean allowHLCTables,
- int deletedSegmentsRetentionInDays) {
+ int deletedSegmentsRetentionInDays, LineageManager lineageManager) {
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
_helixClusterName = helixClusterName;
_dataDir = dataDir;
@@ -244,12 +252,14 @@ public class PinotHelixResourceManager {
for (int i = 0; i < _tableUpdaterLocks.length; i++) {
_tableUpdaterLocks[i] = new Object();
}
+ _lineageManager = lineageManager;
}
public PinotHelixResourceManager(ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(),
controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(),
- controllerConf.getHLCTablesAllowed(), controllerConf.getDeletedSegmentsRetentionInDays());
+ controllerConf.getHLCTablesAllowed(), controllerConf.getDeletedSegmentsRetentionInDays(),
+ LineageManagerFactory.create(controllerConf));
}
/**
@@ -367,6 +377,15 @@ public class PinotHelixResourceManager {
return _propertyStore;
}
+ /**
+ * Get the linage manager.
+ *
+ * @return lineage manager
+ */
+ public LineageManager getLineageManager() {
+ return _lineageManager;
+ }
+
/**
* Instance related APIs
*/
@@ -2209,8 +2228,8 @@ public class PinotHelixResourceManager {
tasks.put(jobId, jobMetadata);
if (tasks.size() > CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
tasks = tasks.entrySet().stream().sorted((v1, v2) -> Long.compare(
- Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
- Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
+ Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
+ Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
.collect(Collectors.toList()).subList(0, CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@@ -3379,12 +3398,13 @@ public class PinotHelixResourceManager {
* @param segmentsFrom a list of segments to be merged
* @param segmentsTo a list of merged segments
* @param forceCleanup True for enabling the force segment cleanup
+ * @param customMap
* @return Segment lineage entry id
*
* @throws InvalidConfigException
*/
public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo,
- boolean forceCleanup) {
+ boolean forceCleanup, Map<String, String> customMap) {
// Create a segment lineage entry id
String segmentLineageEntryId = SegmentLineageUtils.generateLineageEntryId();
@@ -3534,6 +3554,8 @@ public class PinotHelixResourceManager {
segmentLineage.addLineageEntry(segmentLineageEntryId,
new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+ _lineageManager
+ .updateLineageForStartReplaceSegments(tableConfig, segmentLineageEntryId, customMap, segmentLineage);
// Write back to the lineage entry to the property store
if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) {
// Trigger the proactive segment clean up if needed. Once the lineage is updated in the property store, it
@@ -3570,9 +3592,9 @@ public class PinotHelixResourceManager {
*
* Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
* metadata.
- *
* @param tableNameWithType
* @param segmentLineageEntryId
+ * @param endReplaceSegmentsRequest
*/
public void endReplaceSegments(String tableNameWithType, String segmentLineageEntryId,
@Nullable EndReplaceSegmentsRequest endReplaceSegmentsRequest) {
@@ -3630,8 +3652,11 @@ public class PinotHelixResourceManager {
new LineageEntry(lineageEntry.getSegmentsFrom(), segmentsTo, LineageEntryState.COMPLETED,
System.currentTimeMillis());
- if (writeLineageEntryWithTightLoop(tableNameWithType, segmentLineageEntryId, lineageEntryToUpdate, lineageEntry,
- _propertyStore)) {
+ TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ Map<String, String> customMap =
+ endReplaceSegmentsRequest == null ? null : endReplaceSegmentsRequest.getCustomMap();
+ if (writeLineageEntryWithTightLoop(tableConfig, segmentLineageEntryId, lineageEntryToUpdate, lineageEntry,
+ _propertyStore, LineageUpdateType.END, customMap)) {
// If the segment lineage metadata is successfully updated, we need to trigger brokers to rebuild the
// routing table because it is possible that there has been no EV change but the routing result may be
// different after updating the lineage entry.
@@ -3671,11 +3696,12 @@ public class PinotHelixResourceManager {
*
* Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
* metadata.
- *
* @param tableNameWithType
* @param segmentLineageEntryId
+ * @param revertReplaceSegmentsRequest
*/
- public void revertReplaceSegments(String tableNameWithType, String segmentLineageEntryId, boolean forceRevert) {
+ public void revertReplaceSegments(String tableNameWithType, String segmentLineageEntryId, boolean forceRevert,
+ @Nullable RevertReplaceSegmentsRequest revertReplaceSegmentsRequest) {
try {
DEFAULT_RETRY_POLICY.attempt(() -> {
// Fetch the segment lineage metadata
@@ -3739,8 +3765,11 @@ public class PinotHelixResourceManager {
new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
System.currentTimeMillis());
- if (writeLineageEntryWithTightLoop(tableNameWithType, segmentLineageEntryId, lineageEntryToUpdate, lineageEntry,
- _propertyStore)) {
+ TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ Map<String, String> customMap =
+ revertReplaceSegmentsRequest == null ? null : revertReplaceSegmentsRequest.getCustomMap();
+ if (writeLineageEntryWithTightLoop(tableConfig, segmentLineageEntryId, lineageEntryToUpdate, lineageEntry,
+ _propertyStore, LineageUpdateType.REVERT, customMap)) {
// If the segment lineage metadata is successfully updated, we need to trigger brokers to rebuild the
// routing table because it is possible that there has been no EV change but the routing result may be
// different after updating the lineage entry.
@@ -3770,20 +3799,22 @@ public class PinotHelixResourceManager {
/**
* Update the lineage entry with the tight loop to increase the chance for successful ZK write.
- *
- * @param tableNameWithType table name with type
+ * @param tableConfig table config
* @param lineageEntryId lineage entry id
* @param lineageEntryToUpdate lineage entry that needs to be updated
* @param lineageEntryToMatch lineage entry that needs to match with the entry from the newly fetched segment lineage.
* @param propertyStore property store
+ * @param lineageUpdateType
+ * @param customMap
*/
- private boolean writeLineageEntryWithTightLoop(String tableNameWithType, String lineageEntryId,
+ private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig, String lineageEntryId,
LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch,
- ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ ZkHelixPropertyStore<ZNRecord> propertyStore,
+ LineageUpdateType lineageUpdateType, Map<String, String> customMap) {
for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) {
// Fetch the segment lineage
ZNRecord segmentLineageToUpdateZNRecord =
- SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableNameWithType);
+ SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableConfig.getTableName());
int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
SegmentLineage segmentLineageToUpdate = SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord);
LineageEntry currentLineageEntry = segmentLineageToUpdate.getLineageEntry(lineageEntryId);
@@ -3792,13 +3823,28 @@ public class PinotHelixResourceManager {
if (!currentLineageEntry.equals(lineageEntryToMatch)) {
String errorMsg = String.format(
"Aborting the to update lineage entry since we find that the entry has been modified for table %s, "
- + "entry id: %s", tableNameWithType, lineageEntryId);
+ + "entry id: %s", tableConfig.getTableName(), lineageEntryId);
LOGGER.error(errorMsg);
throw new RuntimeException(errorMsg);
}
// Update lineage entry
segmentLineageToUpdate.updateLineageEntry(lineageEntryId, lineageEntryToUpdate);
+ switch (lineageUpdateType) {
+ case START:
+ _lineageManager
+ .updateLineageForStartReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate);
+ break;
+ case END:
+ _lineageManager
+ .updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate);
+ break;
+ case REVERT:
+ _lineageManager
+ .updateLineageForRevertReplaceSegments(tableConfig, lineageEntryId, customMap, segmentLineageToUpdate);
+ break;
+ default:
+ }
// Write back to the lineage entry
if (SegmentLineageAccessHelper.writeSegmentLineage(propertyStore, segmentLineageToUpdate, expectedVersion)) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
new file mode 100644
index 0000000000..225c65b717
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+
+
+public class DefaultLineageManager implements LineageManager {
+ private static final long REPLACED_SEGMENTS_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(1L); // 1 day
+ private static final long LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(1L); // 1 day
+
+ protected ControllerConf _controllerConf;
+
+ public DefaultLineageManager(ControllerConf controllerConf) {
+ _controllerConf = controllerConf;
+ }
+
+ @Override
+ public void updateLineageForStartReplaceSegments(TableConfig tableConfig, String lineageEntryId,
+ Map<String, String> customMap, SegmentLineage lineage) {
+ }
+
+ @Override
+ public void updateLineageForEndReplaceSegments(TableConfig tableConfig, String lineageEntryId,
+ Map<String, String> customMap, SegmentLineage lineage) {
+ }
+
+ @Override
+ public void updateLineageForRevertReplaceSegments(TableConfig tableConfig, String lineageEntryId,
+ Map<String, String> customMap, SegmentLineage lineage) {
+ }
+
+ /**
+ * This method:
+ * 1. Update lineage metadata by removing lineage entries
+ * 2. Find segments that need to be deleted
+ */
+ @Override
+ public void updateLineageForRetention(TableConfig tableConfig, SegmentLineage lineage, List<String> allSegments,
+ List<String> segmentsToDelete, Set<String> consumingSegments) {
+ // 1. The original segments can be deleted once the merged segments are successfully uploaded
+ // 2. The zombie lineage entry & merged segments should be deleted if the segment replacement failed in
+ // the middle
+ Set<String> segmentsForTable = new HashSet<>(allSegments);
+ Iterator<LineageEntry> lineageEntryIterator = lineage.getLineageEntries().values().iterator();
+ while (lineageEntryIterator.hasNext()) {
+ LineageEntry lineageEntry = lineageEntryIterator.next();
+ if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+ Set<String> sourceSegments = new HashSet<>(lineageEntry.getSegmentsFrom());
+ sourceSegments.retainAll(segmentsForTable);
+ if (sourceSegments.isEmpty()) {
+ // If the lineage state is 'COMPLETED' and segmentFrom are removed, it is safe clean up the lineage entry
+ lineageEntryIterator.remove();
+ } else {
+ // If the lineage state is 'COMPLETED' and we already preserved the original segments for the required
+ // retention, it is safe to delete all segments from 'segmentsFrom'
+ if (shouldDeleteReplacedSegments(tableConfig, lineageEntry)) {
+ segmentsToDelete.addAll(sourceSegments);
+ }
+ }
+ } else if (lineageEntry.getState() == LineageEntryState.REVERTED || (
+ lineageEntry.getState() == LineageEntryState.IN_PROGRESS && lineageEntry.getTimestamp()
+ < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS)) {
+ // If the lineage state is 'IN_PROGRESS' or 'REVERTED', we need to clean up the zombie lineage
+ // entry and its segments
+ Set<String> destinationSegments = new HashSet<>(lineageEntry.getSegmentsTo());
+ destinationSegments.retainAll(segmentsForTable);
+ if (destinationSegments.isEmpty()) {
+ // If the lineage state is 'IN_PROGRESS or REVERTED' and source segments are already removed, it is safe
+ // to clean up the lineage entry. Deleting lineage will allow the task scheduler to re-schedule the source
+ // segments to be merged again.
+ lineageEntryIterator.remove();
+ } else {
+ // If the lineage state is 'IN_PROGRESS', it is safe to delete all segments from 'segmentsTo'
+ segmentsToDelete.addAll(destinationSegments);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Helper function to decide whether we should delete segmentsFrom (replaced segments) given a lineage entry.
+ *
+ * The replaced segments are safe to delete if the following conditions are all satisfied
+ * 1) Table is "APPEND"
+ * 2) It has been more than 24 hours since the lineage entry became "COMPLETED" state.
+ *
+ * @param tableConfig a table config
+ * @param lineageEntry lineage entry
+ * @return True if we can safely delete the replaced segments. False otherwise.
+ */
+ private boolean shouldDeleteReplacedSegments(TableConfig tableConfig, LineageEntry lineageEntry) {
+ // TODO: Currently, we preserve the replaced segments for 1 day for REFRESH tables only. Once we support
+ // data rollback for APPEND tables, we should remove this check.
+ String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
+ if (!batchSegmentIngestionType.equalsIgnoreCase("REFRESH")
+ || lineageEntry.getTimestamp() < System.currentTimeMillis() - REPLACED_SEGMENTS_RETENTION_IN_MILLIS) {
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManager.java
new file mode 100644
index 0000000000..c829ef243b
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManager.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+/**
+ * Interface to update lineage metadata based on custom inputs and garbage collect lineage entries
+ */
+public interface LineageManager {
+
+ /**
+ * Update lineage based on customMap for start replace calls
+ * @param tableConfig
+ * @param lineageEntryId
+ * @param customMap
+ * @param lineage
+ */
+ void updateLineageForStartReplaceSegments(TableConfig tableConfig, String lineageEntryId,
+ Map<String, String> customMap, SegmentLineage lineage);
+
+ /**
+ * Update lineage based on customMap for end replace calls
+ * @param tableConfig
+ * @param lineageEntryId
+ * @param customMap
+ * @param lineage
+ */
+ void updateLineageForEndReplaceSegments(TableConfig tableConfig, String lineageEntryId, Map<String, String> customMap,
+ SegmentLineage lineage);
+
+ /**
+ * Update lineage based on customMap for revert replace calls
+ * @param tableConfig
+ * @param lineageEntryId
+ * @param customMap
+ * @param lineage
+ */
+ void updateLineageForRevertReplaceSegments(TableConfig tableConfig, String lineageEntryId,
+ Map<String, String> customMap, SegmentLineage lineage);
+
+ /**
+ * Update lineage for retention purposes
+ * @param tableConfig
+ * @param lineage
+ * @param allSegments
+ * @param segmentsToDelete
+ * @param consumingSegments
+ */
+ void updateLineageForRetention(TableConfig tableConfig, SegmentLineage lineage, List<String> allSegments,
+ List<String> segmentsToDelete, Set<String> consumingSegments);
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManagerFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManagerFactory.java
new file mode 100644
index 0000000000..1a07dfefac
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/LineageManagerFactory.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.lineage;
+
+import org.apache.pinot.controller.ControllerConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Factory class to create {@link LineageManager} based on controller configs
+ */
+public class LineageManagerFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LineageManagerFactory.class);
+
+ private LineageManagerFactory() {
+ }
+
+ public static LineageManager create(ControllerConf controllerConf) {
+ String lineageManagerClassName = controllerConf.getLineageManagerClass();
+ try {
+ return (LineageManager) Class.forName(lineageManagerClassName).getConstructor(ControllerConf.class)
+ .newInstance(controllerConf);
+ } catch (Exception e) {
+ LOGGER.error("LineageManager not found: {}", lineageManagerClassName);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 977733dbec..36443146e0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -20,15 +20,12 @@ package org.apache.pinot.controller.helix.core.retention;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.lineage.LineageEntry;
-import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -59,8 +56,6 @@ import org.slf4j.LoggerFactory;
*/
public class RetentionManager extends ControllerPeriodicTask<Void> {
public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(5L);
- private static final long REPLACED_SEGMENTS_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(1L); // 1 day
- public static final long LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS = TimeUnit.DAYS.toMillis(1L); // 1 day
private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
private static final Logger LOGGER = LoggerFactory.getLogger(RetentionManager.class);
@@ -210,46 +205,11 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
int expectedVersion = segmentLineageZNRecord.getVersion();
- // 1. The original segments can be deleted once the merged segments are successfully uploaded
- // 2. The zombie lineage entry & merged segments should be deleted if the segment replacement failed in
- // the middle
- Set<String> segmentsForTable =
- new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false));
+ List<String> segmentsForTable = _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false);
List<String> segmentsToDelete = new ArrayList<>();
- Iterator<LineageEntry> lineageEntryIterator = segmentLineage.getLineageEntries().values().iterator();
- while (lineageEntryIterator.hasNext()) {
- LineageEntry lineageEntry = lineageEntryIterator.next();
- if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
- Set<String> sourceSegments = new HashSet<>(lineageEntry.getSegmentsFrom());
- sourceSegments.retainAll(segmentsForTable);
- if (sourceSegments.isEmpty()) {
- // If the lineage state is 'COMPLETED' and segmentFrom are removed, it is safe clean up the lineage entry
- lineageEntryIterator.remove();
- } else {
- // If the lineage state is 'COMPLETED' and we already preserved the original segments for the required
- // retention, it is safe to delete all segments from 'segmentsFrom'
- if (shouldDeleteReplacedSegments(tableConfig, lineageEntry)) {
- segmentsToDelete.addAll(sourceSegments);
- }
- }
- } else if (lineageEntry.getState() == LineageEntryState.REVERTED || (
- lineageEntry.getState() == LineageEntryState.IN_PROGRESS && lineageEntry.getTimestamp()
- < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS)) {
- // If the lineage state is 'IN_PROGRESS' or 'REVERTED', we need to clean up the zombie lineage
- // entry and its segments
- Set<String> destinationSegments = new HashSet<>(lineageEntry.getSegmentsTo());
- destinationSegments.retainAll(segmentsForTable);
- if (destinationSegments.isEmpty()) {
- // If the lineage state is 'IN_PROGRESS or REVERTED' and source segments are already removed, it is safe
- // to clean up the lineage entry. Deleting lineage will allow the task scheduler to re-schedule the source
- // segments to be merged again.
- lineageEntryIterator.remove();
- } else {
- // If the lineage state is 'IN_PROGRESS', it is safe to delete all segments from 'segmentsTo'
- segmentsToDelete.addAll(destinationSegments);
- }
- }
- }
+ _pinotHelixResourceManager.getLineageManager()
+ .updateLineageForRetention(tableConfig, segmentLineage, segmentsForTable, segmentsToDelete,
+ _pinotHelixResourceManager.getConsumingSegments(tableNameWithType));
// Write back to the lineage entry
if (SegmentLineageAccessHelper.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
@@ -274,26 +234,4 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
}
LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
}
-
- /**
- * Helper function to decide whether we should delete segmentsFrom (replaced segments) given a lineage entry.
- *
- * The replaced segments are safe to delete if the following conditions are all satisfied
- * 1) Table is "APPEND"
- * 2) It has been more than 24 hours since the lineage entry became "COMPLETED" state.
- *
- * @param tableConfig a table config
- * @param lineageEntry lineage entry
- * @return True if we can safely delete the replaced segments. False otherwise.
- */
- private boolean shouldDeleteReplacedSegments(TableConfig tableConfig, LineageEntry lineageEntry) {
- // TODO: Currently, we preserve the replaced segments for 1 day for REFRESH tables only. Once we support
- // data rollback for APPEND tables, we should remove this check.
- String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
- if (!batchSegmentIngestionType.equalsIgnoreCase("REFRESH")
- || lineageEntry.getTimestamp() < System.currentTimeMillis() - REPLACED_SEGMENTS_RETENTION_IN_MILLIS) {
- return true;
- }
- return false;
- }
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
index 181fe6ef44..5c8713136c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
@@ -81,13 +81,14 @@ public class PinotSegmentRestletResourceTest {
// Now starts to replace segments.
List<String> segmentsFrom = Arrays.asList("s0", "s1");
List<String> segmentsTo = Collections.singletonList("some_segment");
- String segmentLineageId = resourceManager.startReplaceSegments(offlineTableName, segmentsFrom, segmentsTo, false);
+ String segmentLineageId = resourceManager.startReplaceSegments(offlineTableName, segmentsFrom, segmentsTo, false,
+ null);
// Replace more segments to add another entry to segment lineage.
segmentsFrom = Arrays.asList("s2", "s3");
segmentsTo = Collections.singletonList("another_segment");
String nextSegmentLineageId =
- resourceManager.startReplaceSegments(offlineTableName, segmentsFrom, segmentsTo, false);
+ resourceManager.startReplaceSegments(offlineTableName, segmentsFrom, segmentsTo, false, null);
// There should now be two segment lineage entries resulting from the operations above.
segmentLineageResponse =
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index d72571b1c5..562b011677 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -805,17 +805,17 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom = Collections.emptyList();
List<String> segmentsTo = Arrays.asList("s20", "s21");
String lineageEntryId =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom, segmentsTo, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom, segmentsTo, false, null);
assertThrows(RuntimeException.class,
() -> _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId,
- new EndReplaceSegmentsRequest(Arrays.asList("s9", "s6"))));
+ new EndReplaceSegmentsRequest(Arrays.asList("s9", "s6"), null)));
// Try after new segments added to the table
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s20"), "downloadUrl");
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s21"), "downloadUrl");
_helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId,
- new EndReplaceSegmentsRequest(Arrays.asList("s21")));
+ new EndReplaceSegmentsRequest(Arrays.asList("s21"), null));
SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds(), Collections.singleton(lineageEntryId));
assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(), segmentsFrom);
@@ -847,7 +847,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom1 = Collections.emptyList();
List<String> segmentsTo1 = Arrays.asList("s5", "s6");
String lineageEntryId1 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom1, segmentsTo1, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom1, segmentsTo1, false, null);
SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds(), Collections.singleton(lineageEntryId1));
assertEquals(segmentLineage.getLineageEntry(lineageEntryId1).getSegmentsFrom(), segmentsFrom1);
@@ -857,15 +857,15 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
// Check invalid segmentsTo
assertThrows(IllegalStateException.class,
() -> _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, Arrays.asList("s1", "s2"),
- Arrays.asList("s3", "s4"), false));
+ Arrays.asList("s3", "s4"), false, null));
assertThrows(IllegalStateException.class,
() -> _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, Arrays.asList("s1", "s2"),
- Collections.singletonList("s2"), false));
+ Collections.singletonList("s2"), false, null));
// Check invalid segmentsFrom
assertThrows(IllegalStateException.class,
() -> _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, Arrays.asList("s1", "s6"),
- Collections.singletonList("s7"), false));
+ Collections.singletonList("s7"), false, null));
// Invalid table
assertThrows(RuntimeException.class,
@@ -898,7 +898,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom2 = Arrays.asList("s1", "s2");
List<String> segmentsTo2 = Arrays.asList("merged_t1_0", "merged_t1_1");
String lineageEntryId2 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom2, segmentsTo2, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom2, segmentsTo2, false, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertSetEquals(segmentLineage.getLineageEntryIds(), lineageEntryId1, lineageEntryId2);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), segmentsFrom2);
@@ -914,10 +914,10 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
// Revert the entry with partial data uploaded without forceRevert
assertThrows(RuntimeException.class,
- () -> _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId2, false));
+ () -> _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId2, false, null));
// Revert the entry with partial data uploaded with forceRevert
- _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId2, true);
+ _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId2, true, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.REVERTED);
@@ -930,7 +930,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom3 = Arrays.asList("s1", "s2");
List<String> segmentsTo3 = Arrays.asList("merged_t2_0", "merged_t2_1");
String lineageEntryId3 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom3, segmentsTo3, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom3, segmentsTo3, false, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertSetEquals(segmentLineage.getLineageEntryIds(), lineageEntryId1, lineageEntryId2, lineageEntryId3);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(), segmentsFrom3);
@@ -945,11 +945,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom4 = Arrays.asList("s1", "s2");
List<String> segmentsTo4 = Arrays.asList("merged_t3_0", "merged_t3_1");
assertThrows(RuntimeException.class,
- () -> _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, false));
+ () -> _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, false, null));
// Test force clean up case
String lineageEntryId4 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, true);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, true, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertSetEquals(segmentLineage.getLineageEntryIds(), lineageEntryId1, lineageEntryId2, lineageEntryId3,
lineageEntryId4);
@@ -985,7 +985,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom5 = Collections.emptyList();
List<String> segmentsTo5 = Arrays.asList("s7", "s8");
String lineageEntryId5 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom5, segmentsTo5, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom5, segmentsTo5, false, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertSetEquals(segmentLineage.getLineageEntryIds(), lineageEntryId1, lineageEntryId2, lineageEntryId3,
lineageEntryId4, lineageEntryId5);
@@ -998,7 +998,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom6 = Collections.emptyList();
List<String> segmentsTo6 = Arrays.asList("s7", "s8");
String lineageEntryId6 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom6, segmentsTo6, true);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom6, segmentsTo6, true, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertSetEquals(segmentLineage.getLineageEntryIds(), lineageEntryId1, lineageEntryId2, lineageEntryId3,
lineageEntryId4, lineageEntryId6);
@@ -1015,7 +1015,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom7 = Collections.emptyList();
List<String> segmentsTo7 = Arrays.asList("s9", "s10");
String lineageEntryId7 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom7, segmentsTo7, true);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom7, segmentsTo7, true, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), LineageEntryState.IN_PROGRESS);
@@ -1039,7 +1039,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom8 = Arrays.asList("s9", "s10");
List<String> segmentsTo8 = Arrays.asList("s11", "s12");
String lineageEntryId8 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom8, segmentsTo8, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom8, segmentsTo8, false, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds().size(), 7);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(), segmentsFrom8);
@@ -1055,7 +1055,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom9 = Arrays.asList("s0", "s9");
List<String> segmentsTo9 = Arrays.asList("s13", "s14");
String lineageEntryId9 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom9, segmentsTo9, true);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom9, segmentsTo9, true, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), LineageEntryState.REVERTED);
@@ -1081,7 +1081,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom10 = Arrays.asList("s13", "s14");
List<String> segmentsTo10 = Collections.emptyList();
String lineageEntryId10 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom10, segmentsTo10, true);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom10, segmentsTo10, true, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds().size(), 9);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getState(), LineageEntryState.IN_PROGRESS);
@@ -1121,7 +1121,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom1 = Arrays.asList("s0", "s1", "s2");
List<String> segmentsTo1 = Arrays.asList("s3", "s4", "s5");
String lineageEntryId1 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom1, segmentsTo1, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom1, segmentsTo1, false, null);
SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds(), Collections.singleton(lineageEntryId1));
assertEquals(segmentLineage.getLineageEntry(lineageEntryId1).getSegmentsFrom(), segmentsFrom1);
@@ -1153,7 +1153,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom2 = Arrays.asList("s3", "s4", "s5");
List<String> segmentsTo2 = Arrays.asList("s6", "s7", "s8");
String lineageEntryId2 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom2, segmentsTo2, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom2, segmentsTo2, false, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), segmentsFrom2);
@@ -1164,7 +1164,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
// Reverting the first entry should fail
assertThrows(RuntimeException.class,
- () -> _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId1, false));
+ () -> _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId1, false, null));
// Add partial segments to indicate incomplete protocol
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
@@ -1180,7 +1180,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom3 = Arrays.asList("s3", "s4", "s5");
List<String> segmentsTo3 = Arrays.asList("s9", "s10", "s11");
String lineageEntryId3 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom3, segmentsTo3, true);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom3, segmentsTo3, true, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds().size(), 3);
@@ -1215,7 +1215,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
// Call revert segment replacements (s3, s4, s5) <- (s9, s10, s11) to check if the revertReplaceSegments correctly
// deleted (s9, s10, s11).
- _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId3, false);
+ _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId3, false, null);
assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), 3);
assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true), "s3", "s4", "s5");
@@ -1231,7 +1231,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom4 = Arrays.asList("s3", "s4", "s5");
List<String> segmentsTo4 = Arrays.asList("s12", "s13", "s14");
String lineageEntryId4 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, true);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom4, segmentsTo4, true, null);
assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false), "s3", "s4", "s5");
// Upload the new segments (s12, s13, s14)
@@ -1251,7 +1251,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom5 = Collections.emptyList();
List<String> segmentsTo5 = Arrays.asList("s15", "s16");
String lineageEntryId5 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom5, segmentsTo5, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom5, segmentsTo5, false, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getSegmentsFrom(), segmentsFrom5);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getSegmentsTo(), segmentsTo5);
@@ -1266,7 +1266,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom6 = Collections.emptyList();
List<String> segmentsTo6 = Arrays.asList("s17", "s18");
String lineageEntryId6 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom6, segmentsTo6, true);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom6, segmentsTo6, true, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(), LineageEntryState.IN_PROGRESS);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), LineageEntryState.IN_PROGRESS);
@@ -1287,7 +1287,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom7 = Arrays.asList("s17", "s18");
List<String> segmentsTo7 = Arrays.asList("s19", "s20");
String lineageEntryId7 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom7, segmentsTo7, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom7, segmentsTo7, false, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsFrom(), segmentsFrom7);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsTo(), segmentsTo7);
@@ -1302,7 +1302,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom8 = Arrays.asList("s14", "s17");
List<String> segmentsTo8 = Arrays.asList("s21", "s22");
String lineageEntryId8 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom8, segmentsTo8, true);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom8, segmentsTo8, true, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), LineageEntryState.REVERTED);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), LineageEntryState.IN_PROGRESS);
@@ -1324,7 +1324,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom9 = Arrays.asList("s21", "s22");
List<String> segmentsTo9 = Arrays.asList("s23", "s24");
String lineageEntryId9 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom9, segmentsTo9, false);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom9, segmentsTo9, false, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(), segmentsFrom9);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(), segmentsTo9);
@@ -1341,7 +1341,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
List<String> segmentsFrom10 = Arrays.asList("s21", "s22");
List<String> segmentsTo10 = Arrays.asList("s24", "s25");
String lineageEntryId10 =
- _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom10, segmentsTo10, true);
+ _helixResourceManager.startReplaceSegments(OFFLINE_TABLE_NAME, segmentsFrom10, segmentsTo10, true, null);
segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(), segmentsFrom9);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(), Collections.singletonList("s23"));
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 99b1341f6d..5842d4f672 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -148,7 +148,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
.collect(Collectors.toList());
String lineageEntryId =
SegmentConversionUtils.startSegmentReplace(context.getTableNameWithType(), context.getUploadURL(),
- new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), context.getAuthProvider());
+ new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo, null), context.getAuthProvider());
context.setCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID, lineageEntryId);
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
index 7296cc7928..fda44e2139 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
@@ -120,7 +120,7 @@ public class ConsistentDataPushUtils {
List<String> segmentsFrom = uriToSegmentsFrom.get(controllerUri);
StartReplaceSegmentsRequest startReplaceSegmentsRequest =
- new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo);
+ new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo, null);
DEFAULT_RETRY_POLICY.attempt(() -> {
try {
SimpleHttpResponse response =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org