You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/11/02 00:14:58 UTC

[pinot] branch master updated: Add revertSegmentReplacement API (#7662)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new da98e8b  Add revertSegmentReplacement API (#7662)
da98e8b is described below

commit da98e8b117b0f992ca5bceb084dc7e60321a0164
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Mon Nov 1 17:14:40 2021 -0700

    Add revertSegmentReplacement API (#7662)
---
 .../pinot/common/lineage/LineageEntryState.java    |  2 +-
 .../PinotSegmentUploadDownloadRestletResource.java | 50 ++++++++++--
 .../helix/core/PinotHelixResourceManager.java      | 92 ++++++++++++++++++++--
 .../helix/core/retention/RetentionManager.java     | 29 +++----
 4 files changed, 146 insertions(+), 27 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java b/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java
index 74407c0..0d5bcd7 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntryState.java
@@ -22,5 +22,5 @@ package org.apache.pinot.common.lineage;
  * Enum for represent the state of lineage entry
  */
 public enum LineageEntryState {
-  IN_PROGRESS, COMPLETED
+  IN_PROGRESS, COMPLETED, REVERTED
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index f2c6c45..78de2fd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -544,11 +544,15 @@ public class PinotSegmentUploadDownloadRestletResource {
   @ApiOperation(value = "Start to replace segments", notes = "Start to replace segments")
   public Response startReplaceSegments(
       @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
-      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
       StartReplaceSegmentsRequest startReplaceSegmentsRequest) {
     try {
-      String tableNameWithType =
-          TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+      TableType tableType = Constants.validateTableType(tableTypeStr);
+      if (tableType == null) {
+        throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
+            Response.Status.BAD_REQUEST);
+      }
+      String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
       String segmentLineageEntryId = _pinotHelixResourceManager
           .startReplaceSegments(tableNameWithType, startReplaceSegmentsRequest.getSegmentsFrom(),
               startReplaceSegmentsRequest.getSegmentsTo());
@@ -565,12 +569,16 @@ public class PinotSegmentUploadDownloadRestletResource {
   @ApiOperation(value = "End to replace segments", notes = "End to replace segments")
   public Response endReplaceSegments(
       @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
-      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
-      @ApiParam(value = "Segment lineage entry id returned by startReplaceSegments API")
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Segment lineage entry id returned by startReplaceSegments API", required = true)
       @QueryParam("segmentLineageEntryId") String segmentLineageEntryId) {
     try {
-      String tableNameWithType =
-          TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+      TableType tableType = Constants.validateTableType(tableTypeStr);
+      if (tableType == null) {
+        throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
+            Response.Status.BAD_REQUEST);
+      }
+      String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
       // Check that the segment lineage entry id is valid
       Preconditions.checkNotNull(segmentLineageEntryId, "'segmentLineageEntryId' should not be null");
       _pinotHelixResourceManager.endReplaceSegments(tableNameWithType, segmentLineageEntryId);
@@ -580,6 +588,34 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/revertReplaceSegments")
+  @Authenticate(AccessType.UPDATE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Revert segments replacement", notes = "Revert segments replacement")
+  public Response revertReplaceSegments(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Segment lineage entry id to revert", required = true)
+      @QueryParam("segmentLineageEntryId") String segmentLineageEntryId,
+      @ApiParam(value = "Force revert in case the user knows that the lineage entry is interrupted")
+      @QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert) {
+    try {
+      TableType tableType = Constants.validateTableType(tableTypeStr);
+      if (tableType == null) {
+        throw new ControllerApplicationException(LOGGER, "Table type should either be offline or realtime",
+            Response.Status.BAD_REQUEST);
+      }
+      String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
+      // Check that the segment lineage entry id is valid
+      Preconditions.checkNotNull(segmentLineageEntryId, "'segmentLineageEntryId' should not be null");
+      _pinotHelixResourceManager.revertReplaceSegments(tableNameWithType, segmentLineageEntryId, forceRevert);
+      return Response.ok().build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   private File createSegmentFileFromMultipart(FormDataMultiPart multiPart, File dstFile)
       throws IOException {
     // Read segment file or segment metadata file and directly use that information to update zk
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 38ad8b9..71d3ede 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2787,11 +2787,15 @@ public class PinotHelixResourceManager {
         for (String entryId : segmentLineage.getLineageEntryIds()) {
           LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);
 
-          // Check that any segment from 'segmentsFrom' does not appear twice.
-          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String.format(
-              "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
-                  + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
-              lineageEntry.getSegmentsFrom(), segmentsFrom));
+          // If segment entry is in 'REVERTED' state, no need to check for 'segmentsFrom'.
+          if (lineageEntry.getState() != LineageEntryState.REVERTED) {
+            // Check that any segment from 'segmentsFrom' does not appear twice.
+            Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String
+                .format(
+                    "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                        + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                    lineageEntry.getSegmentsFrom(), segmentsFrom));
+          }
 
           // Check that merged segments name cannot be the same.
           Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
@@ -2907,6 +2911,84 @@ public class PinotHelixResourceManager {
         tableNameWithType, segmentLineageEntryId);
   }
 
+  /**
+   * Revert the segment replacement
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "REVERTED" and write metadata to the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param segmentLineageEntryId
+   */
+  public void revertReplaceSegments(String tableNameWithType, String segmentLineageEntryId, boolean forceRevert) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        Preconditions.checkArgument(segmentLineageZNRecord != null, String
+            .format("Segment lineage does not exist. (tableNameWithType = '%s', segmentLineageEntryId = '%s')",
+                tableNameWithType, segmentLineageEntryId));
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Look up the lineage entry based on the segment lineage entry id
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(segmentLineageEntryId);
+        Preconditions.checkArgument(lineageEntry != null, String
+            .format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType,
+                segmentLineageEntryId));
+
+        if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
+          // We do not allow to revert the lineage entry with 'REVERTED' state. For 'IN_PROGRESS", we only allow to
+          // revert when 'forceRevert' is set to true.
+          if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS || !forceRevert) {
+            LOGGER.warn("Lineage state is not valid. Cannot revert the lineage entry. (tableNameWithType={}, "
+                    + "segmentLineageEntryId={}, segmentLineageEntrySate={}, forceRevert={})", tableNameWithType,
+                segmentLineageEntryId, lineageEntry.getState(), forceRevert);
+            return false;
+          }
+        }
+
+        // Check that all segments from 'segmentsFrom' are in ONLINE state in the external view.
+        Set<String> onlineSegments = getOnlineSegmentsFromExternalView(tableNameWithType);
+        Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()), String.format(
+            "Not all segments from 'segmentFrom' are in ONLINE state in the external view. (tableName = '%s', "
+                + "segmentsFrom = '%s', onlineSegments = '%s'", tableNameWithType, lineageEntry.getSegmentsFrom(),
+            onlineSegments));
+
+        // Update lineage entry
+        LineageEntry newLineageEntry =
+            new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
+                System.currentTimeMillis());
+        segmentLineage.updateLineageEntry(segmentLineageEntryId, newLineageEntry);
+
+        // Write back to the lineage entry
+        if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) {
+          // If the segment lineage metadata is successfully updated, we need to trigger brokers to rebuild the
+          // routing table because it is possible that there has been no EV change but the routing result may be
+          // different after updating the lineage entry.
+          sendRoutingTableRebuildMessage(tableNameWithType);
+          return true;
+        } else {
+          return false;
+        }
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)",
+              tableNameWithType, segmentLineageEntryId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("revertReplaceSegments is successfully processed. (tableNameWithType = {}, segmentLineageEntryId = {})",
+        tableNameWithType, segmentLineageEntryId);
+  }
+
   private void waitForSegmentsBecomeOnline(String tableNameWithType, Set<String> segmentsToCheck)
       throws InterruptedException, TimeoutException {
     long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 67d2d47..2512c78 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -229,20 +229,21 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
               // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
               segmentsToDelete.addAll(sourceSegments);
             }
-          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
-            // If the lineage state is 'IN_PROGRESS', we need to clean up the zombie lineage entry and its segments
-            if (lineageEntry.getTimestamp() < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS) {
-              Set<String> destinationSegments = new HashSet<>(lineageEntry.getSegmentsTo());
-              destinationSegments.retainAll(segmentsForTable);
-              if (destinationSegments.isEmpty()) {
-                // If the lineage state is 'IN_PROGRESS' and source segments are already removed, it is safe to clean up
-                // the lineage entry. Deleting lineage will allow the task scheduler to re-schedule the source segments
-                // to be merged again.
-                segmentLineage.deleteLineageEntry(lineageEntryId);
-              } else {
-                // If the lineage state is 'IN_PROGRESS', it is safe to delete all segments from 'segmentsTo'
-                segmentsToDelete.addAll(destinationSegments);
-              }
+          } else if (lineageEntry.getState() == LineageEntryState.REVERTED || (
+              lineageEntry.getState() == LineageEntryState.IN_PROGRESS && lineageEntry.getTimestamp()
+                  < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS)) {
+            // If the lineage state is 'IN_PROGRESS' or 'REVERTED', we need to clean up the zombie lineage
+            // entry and its segments
+            Set<String> destinationSegments = new HashSet<>(lineageEntry.getSegmentsTo());
+            destinationSegments.retainAll(segmentsForTable);
+            if (destinationSegments.isEmpty()) {
+              // If the lineage state is 'IN_PROGRESS or REVERTED' and source segments are already removed, it is safe
+              // to clean up the lineage entry. Deleting lineage will allow the task scheduler to re-schedule the source
+              // segments to be merged again.
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            } else {
+              // If the lineage state is 'IN_PROGRESS', it is safe to delete all segments from 'segmentsTo'
+              segmentsToDelete.addAll(destinationSegments);
             }
           }
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org