You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/11/02 00:14:58 UTC
[pinot] branch master updated: Add revertSegmentReplacement API
(#7662)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new da98e8b Add revertSegmentReplacement API (#7662)
da98e8b is described below
commit da98e8b117b0f992ca5bceb084dc7e60321a0164
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Mon Nov 1 17:14:40 2021 -0700
Add revertSegmentReplacement API (#7662)
---
.../pinot/common/lineage/LineageEntryState.java | 2 +-
.../PinotSegmentUploadDownloadRestletResource.java | 50 ++++++++++--
.../helix/core/PinotHelixResourceManager.java | 92 ++++++++++++++++++++--
.../helix/core/retention/RetentionManager.java | 29 +++----
4 files changed, 146 insertions(+), 27 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java b/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java
index 74407c0..0d5bcd7 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java
@@ -22,5 +22,5 @@ package org.apache.pinot.common.lineage;
* Enum for represent the state of lineage entry
*/
public enum LineageEntryState {
- IN_PROGRESS, COMPLETED
+ IN_PROGRESS, COMPLETED, REVERTED
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index f2c6c45..78de2fd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -544,11 +544,15 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiOperation(value = "Start to replace segments", notes = "Start to replace segments")
public Response startReplaceSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
- @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
+ @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
StartReplaceSegmentsRequest startReplaceSegmentsRequest) {
try {
- String tableNameWithType =
- TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+ TableType tableType = Constants.validateTableType(tableTypeStr);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
+ Response.Status.BAD_REQUEST);
+ }
+ String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
String segmentLineageEntryId = _pinotHelixResourceManager
.startReplaceSegments(tableNameWithType, startReplaceSegmentsRequest.getSegmentsFrom(),
startReplaceSegmentsRequest.getSegmentsTo());
@@ -565,12 +569,16 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiOperation(value = "End to replace segments", notes = "End to replace segments")
public Response endReplaceSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
- @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
- @ApiParam(value = "Segment lineage entry id returned by startReplaceSegments API")
+ @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+ @ApiParam(value = "Segment lineage entry id returned by startReplaceSegments API", required = true)
@QueryParam("segmentLineageEntryId") String segmentLineageEntryId) {
try {
- String tableNameWithType =
- TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+ TableType tableType = Constants.validateTableType(tableTypeStr);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
+ Response.Status.BAD_REQUEST);
+ }
+ String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
// Check that the segment lineage entry id is valid
Preconditions.checkNotNull(segmentLineageEntryId, "'segmentLineageEntryId' should not be null");
_pinotHelixResourceManager.endReplaceSegments(tableNameWithType, segmentLineageEntryId);
@@ -580,6 +588,34 @@ public class PinotSegmentUploadDownloadRestletResource {
}
}
+ @POST
+ @Path("segments/{tableName}/revertReplaceSegments")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Revert segments replacement", notes = "Revert segments replacement")
+ public Response revertReplaceSegments(
+ @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+ @ApiParam(value = "Segment lineage entry id to revert", required = true)
+ @QueryParam("segmentLineageEntryId") String segmentLineageEntryId,
+ @ApiParam(value = "Force revert in case the user knows that the lineage entry is interrupted")
+ @QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert) {
+ try {
+ TableType tableType = Constants.validateTableType(tableTypeStr);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
+ Response.Status.BAD_REQUEST);
+ }
+ String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
+ // Check that the segment lineage entry id is valid
+ Preconditions.checkNotNull(segmentLineageEntryId, "'segmentLineageEntryId' should not be null");
+ _pinotHelixResourceManager.revertReplaceSegments(tableNameWithType, segmentLineageEntryId, forceRevert);
+ return Response.ok().build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
private File createSegmentFileFromMultipart(FormDataMultiPart multiPart, File dstFile)
throws IOException {
// Read segment file or segment metadata file and directly use that information to update zk
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 38ad8b9..71d3ede 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2787,11 +2787,15 @@ public class PinotHelixResourceManager {
for (String entryId : segmentLineage.getLineageEntryIds()) {
LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);
- // Check that any segment from 'segmentsFrom' does not appear twice.
- Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String.format(
- "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
- + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
- lineageEntry.getSegmentsFrom(), segmentsFrom));
+ // If segment entry is in 'REVERTED' state, no need to check for 'segmentsFrom'.
+ if (lineageEntry.getState() != LineageEntryState.REVERTED) {
+ // Check that any segment from 'segmentsFrom' does not appear twice.
+ Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String
+ .format(
+ "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+ + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+ lineageEntry.getSegmentsFrom(), segmentsFrom));
+ }
// Check that merged segments name cannot be the same.
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
@@ -2907,6 +2911,84 @@ public class PinotHelixResourceManager {
tableNameWithType, segmentLineageEntryId);
}
+ /**
+ * Revert the segment replacement
+ *
+ * 1. Compute validation
+ * 2. Update the lineage entry state to "REVERTED" and write metadata to the property store
+ *
+ * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+ * metadata.
+ *
+ * @param tableNameWithType
+ * @param segmentLineageEntryId
+ */
+ public void revertReplaceSegments(String tableNameWithType, String segmentLineageEntryId, boolean forceRevert) {
+ try {
+ DEFAULT_RETRY_POLICY.attempt(() -> {
+ // Fetch the segment lineage metadata
+ ZNRecord segmentLineageZNRecord =
+ SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+ Preconditions.checkArgument(segmentLineageZNRecord != null, String
+ .format("Segment lineage does not exist. (tableNameWithType = '%s', segmentLineageEntryId = '%s')",
+ tableNameWithType, segmentLineageEntryId));
+ SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+ int expectedVersion = segmentLineageZNRecord.getVersion();
+
+ // Look up the lineage entry based on the segment lineage entry id
+ LineageEntry lineageEntry = segmentLineage.getLineageEntry(segmentLineageEntryId);
+ Preconditions.checkArgument(lineageEntry != null, String
+ .format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType,
+ segmentLineageEntryId));
+
+ if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
+ // We do not allow to revert the lineage entry with 'REVERTED' state. For 'IN_PROGRESS", we only allow to
+ // revert when 'forceRevert' is set to true.
+ if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS || !forceRevert) {
+ LOGGER.warn("Lineage state is not valid. Cannot revert the lineage entry. (tableNameWithType={}, "
+ + "segmentLineageEntryId={}, segmentLineageEntrySate={}, forceRevert={})", tableNameWithType,
+ segmentLineageEntryId, lineageEntry.getState(), forceRevert);
+ return false;
+ }
+ }
+
+ // Check that all segments from 'segmentsFrom' are in ONLINE state in the external view.
+ Set<String> onlineSegments = getOnlineSegmentsFromExternalView(tableNameWithType);
+ Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()), String.format(
+ "Not all segments from 'segmentFrom' are in ONLINE state in the external view. (tableName = '%s', "
+ + "segmentsFrom = '%s', onlineSegments = '%s'", tableNameWithType, lineageEntry.getSegmentsFrom(),
+ onlineSegments));
+
+ // Update lineage entry
+ LineageEntry newLineageEntry =
+ new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
+ System.currentTimeMillis());
+ segmentLineage.updateLineageEntry(segmentLineageEntryId, newLineageEntry);
+
+ // Write back to the lineage entry
+ if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) {
+ // If the segment lineage metadata is successfully updated, we need to trigger brokers to rebuild the
+ // routing table because it is possible that there has been no EV change but the routing result may be
+ // different after updating the lineage entry.
+ sendRoutingTableRebuildMessage(tableNameWithType);
+ return true;
+ } else {
+ return false;
+ }
+ });
+ } catch (Exception e) {
+ String errorMsg = String
+ .format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)",
+ tableNameWithType, segmentLineageEntryId);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg, e);
+ }
+
+ // Only successful attempt can reach here
+ LOGGER.info("revertReplaceSegments is successfully processed. (tableNameWithType = {}, segmentLineageEntryId = {})",
+ tableNameWithType, segmentLineageEntryId);
+ }
+
private void waitForSegmentsBecomeOnline(String tableNameWithType, Set<String> segmentsToCheck)
throws InterruptedException, TimeoutException {
long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 67d2d47..2512c78 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -229,20 +229,21 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
// If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
segmentsToDelete.addAll(sourceSegments);
}
- } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
- // If the lineage state is 'IN_PROGRESS', we need to clean up the zombie lineage entry and its segments
- if (lineageEntry.getTimestamp() < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS) {
- Set<String> destinationSegments = new HashSet<>(lineageEntry.getSegmentsTo());
- destinationSegments.retainAll(segmentsForTable);
- if (destinationSegments.isEmpty()) {
- // If the lineage state is 'IN_PROGRESS' and source segments are already removed, it is safe to clean up
- // the lineage entry. Deleting lineage will allow the task scheduler to re-schedule the source segments
- // to be merged again.
- segmentLineage.deleteLineageEntry(lineageEntryId);
- } else {
- // If the lineage state is 'IN_PROGRESS', it is safe to delete all segments from 'segmentsTo'
- segmentsToDelete.addAll(destinationSegments);
- }
+ } else if (lineageEntry.getState() == LineageEntryState.REVERTED || (
+ lineageEntry.getState() == LineageEntryState.IN_PROGRESS && lineageEntry.getTimestamp()
+ < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS)) {
+ // If the lineage state is 'IN_PROGRESS' or 'REVERTED', we need to clean up the zombie lineage
+ // entry and its segments
+ Set<String> destinationSegments = new HashSet<>(lineageEntry.getSegmentsTo());
+ destinationSegments.retainAll(segmentsForTable);
+ if (destinationSegments.isEmpty()) {
+ // If the lineage state is 'IN_PROGRESS or REVERTED' and source segments are already removed, it is safe
+ // to clean up the lineage entry. Deleting lineage will allow the task scheduler to re-schedule the source
+ // segments to be merged again.
+ segmentLineage.deleteLineageEntry(lineageEntryId);
+ } else {
+ // If the lineage state is 'IN_PROGRESS', it is safe to delete all segments from 'segmentsTo'
+ segmentsToDelete.addAll(destinationSegments);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org